"""Vendor BMC tools: Redfish/HTTPS, Dell racadm, Supermicro SMCIPMITool.
**Security:** all handlers require ``UNSANDBOXED_EXEC``. No shell; argv lists only
for subprocess tools. Redfish uses enum actions mapped to fixed paths.
Remote HPE iLO is accessed via Redfish (not ``hponcfg``, which is local OS only).
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import shutil
from typing import Any
from urllib.parse import quote
import httpx
from tools._safe_http import safe_http_request, safe_httpx_client
from tools._credential_profile_store import (
delete_profile as _cred_delete,
list_profile_names as _cred_list,
load_profile as _cred_load,
merge_profile as _cred_merge,
save_profile as _cred_save,
)
from tools.ipmi_tools import _check_priv, _host_ok, _user_ok
logger = logging.getLogger(__name__)
# Redis: stargazer:{prefix}_credentials:{user_id}
_PREFIX_BMC_REDFISH = "bmc_redfish"
_PREFIX_IDRAC_RACADM = "idrac_racadm"
_PREFIX_SMC_SUPERMICRO = "smc_supermicro"
_MAX_STDOUT = 200_000
_MAX_STDERR = 20_000
_MAX_JSON_BODY = 2_000_000
# --- Redfish -----------------------------------------------------------------
_REDFISH_ACTIONS = frozenset(
{
"service_root",
"systems_collection",
"system_detail",
"chassis_collection",
"chassis_detail",
"managers_collection",
"manager_detail",
"thermal",
"chassis_power",
"processors_memory",
"network_adapters",
"log_services",
"log_entries",
"software_inventory",
"update_service",
"boot_properties",
"boot_override",
"system_reset",
"virtual_media_collection",
}
)
_RESET_TYPES = frozenset(
{
"On",
"ForceOff",
"GracefulShutdown",
"GracefulRestart",
"ForceRestart",
"ForcePowerCycle",
"Nmi",
"PowerCycle",
"PowerOn",
"ColdReset",
"WarmReset",
}
)
def _bmc_base_url(host: str, port: int | None) -> str:
"""Build the HTTPS base URL for a BMC's Redfish endpoint.
Normalises a host that may already carry an ``http(s)://`` scheme, otherwise
defaults to ``https://`` and appends the port when one is supplied. Trailing
slashes are stripped so callers can concatenate ``/redfish/v1/...`` paths.
Called by :func:`run_bmc_redfish` to derive ``base`` before any Redfish
request; no other callers exist.
Args:
host: BMC hostname or IP, optionally prefixed with a scheme.
port: Optional HTTPS port; ignored when ``None`` or not positive.
Returns:
str: The normalised base URL without a trailing slash, or ``""`` when
``host`` is empty/whitespace.
"""
h = (host or "").strip()
if not h:
return ""
if h.startswith("https://") or h.startswith("http://"):
return h.rstrip("/")
if port is not None and port > 0:
return f"https://{h}:{int(port)}"
return f"https://{h}"
def _port_ok(port: int | None) -> bool:
"""Validate that an optional TCP port is within the legal range.
Treats ``None`` as valid (meaning "use the default"); otherwise requires a
value in ``1..65535``.
Called by :func:`run_bmc_redfish` during input validation; no other callers.
Args:
port: Port number to check, or ``None`` to accept the default.
Returns:
bool: ``True`` if ``None`` or a port in ``1..65535``, else ``False``.
"""
if port is None:
return True
return 1 <= int(port) <= 65535
def _bad_pw(s: str | None) -> bool:
"""Report whether a password contains characters that must be rejected.
Rejects ``None`` outright and any string containing a newline, carriage
return, or NUL byte. These could break header/credential framing when the
password is passed to ``httpx`` BasicAuth or to subprocess argv, so callers
refuse such values.
Called by :func:`run_bmc_redfish` while validating the supplied password; no
other callers.
Args:
s: Candidate password, or ``None``.
Returns:
bool: ``True`` if the value is unusable (``None`` or contains a
control character), else ``False``.
"""
if s is None:
return True
return any(c in str(s) for c in "\n\r\x00")
def _bad_token(s: str) -> bool:
"""Report whether an identifier token contains forbidden control characters.
Used to sanitise caller-supplied Redfish member ids (system/chassis/manager)
before they are interpolated into request URLs, rejecting newline, carriage
return, and NUL bytes. An empty token is considered acceptable here (callers
handle emptiness separately, e.g. by auto-discovery).
Note: this name shadows the unrelated ``_bad_token`` defined in other tool
modules; this module imports its own host/user validators from
``tools.ipmi_tools`` but defines this token check locally. Called by
:func:`_resolve_system_id`, :func:`_resolve_chassis_id`, and
:func:`_resolve_manager_id`.
Args:
s: Identifier token to validate.
Returns:
bool: ``True`` if ``s`` is non-empty and contains a control character,
else ``False``.
"""
if not s:
return False
return any(c in s for c in "\n\r\x00")
async def _redfish_json(
client: httpx.AsyncClient,
method: str,
url: str,
*,
json_body: dict[str, Any] | None = None,
) -> tuple[int, Any]:
"""Issue one SSRF-guarded Redfish HTTP request and parse the JSON reply.
Supports ``GET``/``POST``/``PATCH`` only; ``POST``/``PATCH`` send
``json_body`` (defaulting to ``{}``). The response is decoded as JSON when
the ``Content-Type`` mentions json or the body starts with ``{``, otherwise
the raw text is returned under a ``raw`` key. Both the parsed body and any
raw text are capped at ``_MAX_JSON_BODY`` bytes.
Delegates the actual transfer to :func:`tools._safe_http.safe_http_request`
with ``allow_private=True`` (BMCs typically live on private/management
networks) and ``max_redirects=5``; network and validation failures are
caught and surfaced as a result rather than raised. Called throughout
:func:`run_bmc_redfish` for every Redfish read/write and by
:func:`_first_odata_member_id` during id discovery.
Args:
client: An ``httpx.AsyncClient`` (from :func:`safe_httpx_client`) already
carrying BasicAuth and TLS settings.
method: HTTP verb; one of ``GET``, ``POST``, ``PATCH`` (case-insensitive).
url: Fully-qualified Redfish URL to request.
json_body: Optional JSON payload for ``POST``/``PATCH`` requests.
Returns:
tuple[int, Any]: ``(status_code, body)`` where ``body`` is the parsed
JSON object, a ``{"raw": text}`` fallback, or a ``{"error": ...}`` dict.
The status is ``400`` for an unsupported method and ``-1`` when the
request raised a ``ValueError`` (e.g. SSRF block) or ``httpx.HTTPError``.
"""
try:
m = method.upper()
if m == "GET":
r = await safe_http_request(
client,
"GET",
url,
allow_private=True,
max_redirects=5,
)
elif m == "POST":
r = await safe_http_request(
client,
"POST",
url,
json=json_body or {},
allow_private=True,
max_redirects=5,
)
elif m == "PATCH":
r = await safe_http_request(
client,
"PATCH",
url,
json=json_body or {},
allow_private=True,
max_redirects=5,
)
else:
return 400, {"error": f"unsupported method {method}"}
except ValueError as exc:
return -1, {"error": str(exc)}
except httpx.HTTPError as exc:
return -1, {"error": str(exc)}
ct = (r.headers.get("content-type") or "").lower()
text = r.text[:_MAX_JSON_BODY]
if "json" in ct or text.strip().startswith("{"):
try:
return r.status_code, json.loads(text) if text.strip() else {}
except Exception:
return r.status_code, {"raw": text}
return r.status_code, {"raw": text}
async def _first_odata_member_id(
client: httpx.AsyncClient,
base: str,
collection_path: str,
) -> str | None:
"""Fetch a Redfish collection and return the id of its first member.
GETs the collection, then reads the trailing path segment of the first
``Members[0]["@odata.id"]`` value (e.g. ``"1"`` from
``/redfish/v1/Systems/1``). Returns ``None`` if the collection is missing,
non-200, empty, or malformed.
Issues the request via :func:`_redfish_json`. Called by
:func:`_resolve_system_id`, :func:`_resolve_chassis_id`, and
:func:`_resolve_manager_id` to auto-discover a default id when the caller
did not supply one.
Args:
client: Authenticated ``httpx.AsyncClient`` for the BMC.
base: Normalised BMC base URL from :func:`_bmc_base_url`.
collection_path: Collection path appended to ``base`` (e.g.
``/redfish/v1/Systems``).
Returns:
str | None: The first member's id, or ``None`` when it cannot be
discovered.
"""
code, data = await _redfish_json(client, "GET", f"{base}{collection_path}")
if code != 200 or not isinstance(data, dict):
return None
members = data.get("Members") or []
if not members:
return None
first = members[0]
if not isinstance(first, dict):
return None
oid = first.get("@odata.id")
if not oid or not isinstance(oid, str):
return None
return oid.rstrip("/").split("/")[-1]
async def _resolve_system_id(
client: httpx.AsyncClient,
base: str,
explicit: str | None,
) -> tuple[str | None, str | None]:
"""Resolve the Redfish Systems member id to operate on.
Uses ``explicit`` when provided (after validating it with
:func:`_bad_token`), otherwise auto-discovers the first member via
:func:`_first_odata_member_id` against ``/redfish/v1/Systems``.
Called by :func:`run_bmc_redfish` before any system-scoped action
(``system_detail``, ``processors_memory``, ``network_adapters``,
``log_services``, ``log_entries``, ``boot_properties``, ``boot_override``,
``system_reset``).
Args:
client: Authenticated ``httpx.AsyncClient`` for the BMC.
base: Normalised BMC base URL.
explicit: Caller-supplied system id, or ``None``/empty to auto-detect.
Returns:
tuple[str | None, str | None]: ``(system_id, None)`` on success, or
``(None, error_message)`` when the id is invalid or undiscoverable.
"""
if explicit and explicit.strip():
sid = explicit.strip()
if _bad_token(sid):
return None, "Invalid system_id."
return sid, None
sid = await _first_odata_member_id(client, base, "/redfish/v1/Systems")
if not sid:
return None, "Could not discover a Systems member id."
return sid, None
async def _resolve_chassis_id(
client: httpx.AsyncClient,
base: str,
explicit: str | None,
) -> tuple[str | None, str | None]:
"""Resolve the Redfish Chassis member id to operate on.
Uses ``explicit`` when given (validated with :func:`_bad_token`), otherwise
auto-discovers the first member via :func:`_first_odata_member_id` against
``/redfish/v1/Chassis``.
Called by :func:`run_bmc_redfish` before chassis-scoped actions
(``chassis_detail``, ``thermal``, ``chassis_power``).
Args:
client: Authenticated ``httpx.AsyncClient`` for the BMC.
base: Normalised BMC base URL.
explicit: Caller-supplied chassis id, or ``None``/empty to auto-detect.
Returns:
tuple[str | None, str | None]: ``(chassis_id, None)`` on success, or
``(None, error_message)`` when invalid or undiscoverable.
"""
if explicit and explicit.strip():
cid = explicit.strip()
if _bad_token(cid):
return None, "Invalid chassis_id."
return cid, None
cid = await _first_odata_member_id(client, base, "/redfish/v1/Chassis")
if not cid:
return None, "Could not discover a Chassis member id."
return cid, None
async def _resolve_manager_id(
client: httpx.AsyncClient,
base: str,
explicit: str | None,
) -> tuple[str | None, str | None]:
"""Resolve the Redfish Managers member id to operate on.
Uses ``explicit`` when given (validated with :func:`_bad_token`), otherwise
auto-discovers the first member via :func:`_first_odata_member_id` against
``/redfish/v1/Managers``.
Called by :func:`run_bmc_redfish` for ``manager_detail`` and (twice) for
``virtual_media_collection``, since virtual media hangs off a manager.
Args:
client: Authenticated ``httpx.AsyncClient`` for the BMC.
base: Normalised BMC base URL.
explicit: Caller-supplied manager id, or ``None``/empty to auto-detect.
Returns:
tuple[str | None, str | None]: ``(manager_id, None)`` on success, or
``(None, error_message)`` when invalid or undiscoverable.
"""
if explicit and explicit.strip():
mid = explicit.strip()
if _bad_token(mid):
return None, "Invalid manager_id."
return mid, None
mid = await _first_odata_member_id(client, base, "/redfish/v1/Managers")
if not mid:
return None, "Could not discover a Managers member id."
return mid, None
def _truncate_payload(obj: Any) -> Any:
"""Recursively bound the size of a parsed Redfish payload before returning it.
Caps dicts at the first 500 keys, lists at the first 2000 elements, and any
string longer than 50,000 characters, appending an ellipsis marker to
truncated strings. This keeps a verbose BMC response from overrunning the
tool result that is serialised back to the model.
Called by :func:`run_bmc_redfish` to wrap every ``data``/``boot``/
``processors``/``memory`` value placed in the JSON result; recurses into
itself for nested structures.
Args:
obj: Arbitrary JSON-decoded value (dict, list, str, or scalar).
Returns:
Any: A size-bounded copy of ``obj`` with the same structure; scalars and
short strings are returned unchanged.
"""
if isinstance(obj, dict):
return {k: _truncate_payload(v) for k, v in list(obj.items())[:500]}
if isinstance(obj, list):
return [_truncate_payload(x) for x in obj[:2000]]
if isinstance(obj, str) and len(obj) > 50_000:
return obj[:50_000] + "…(truncated)"
return obj
[docs]
async def run_bmc_redfish(
host: str,
user: str,
password: str,
action: str,
ctx: Any = None,
*,
credential_profile: str = "",
system_id: str = "",
chassis_id: str = "",
manager_id: str = "",
reset_type: str = "ForceRestart",
boot_override_enabled: str = "Once",
boot_override_target: str = "Pxe",
verify_ssl: bool = False,
port: int | None = None,
timeout: float = 120.0,
) -> str:
"""Perform one allowlisted Redfish operation over HTTPS against a BMC.
Backing implementation of the ``bmc_redfish`` tool, covering HPE iLO, Dell
iDRAC (Redfish mode), Lenovo XCC, and many Supermicro/Cisco controllers.
Dispatches on ``action`` — an enum drawn from ``_REDFISH_ACTIONS`` — to a
fixed Redfish path (service root, systems/chassis/managers inventory,
thermal and power readings, log services and entries, firmware/software
inventory, virtual-media listing, boot properties, a boot-override PATCH, or
a ComputerSystem.Reset POST). Member ids are auto-discovered when not
supplied. Only enum actions reach fixed paths; arbitrary URLs are never
issued.
When ``credential_profile`` is set it loads saved connection fields via
:func:`tools._credential_profile_store.load_profile` (aliased ``_cred_load``)
and merges them with ``_cred_merge`` before validating the host, user,
password, and port with :func:`_host_ok`, :func:`_user_ok`, :func:`_bad_pw`,
and :func:`_port_ok`. Requires ``UNSANDBOXED_EXEC`` via
:func:`tools.ipmi_tools._check_priv`. Builds the base URL with
:func:`_bmc_base_url`, opens an SSRF-guarded client from
:func:`tools._safe_http.safe_httpx_client` carrying ``httpx.BasicAuth``,
resolves ids through :func:`_resolve_system_id`, :func:`_resolve_chassis_id`,
and :func:`_resolve_manager_id`, issues each request via
:func:`_redfish_json`, and bounds every response with
:func:`_truncate_payload`. Side effects: outbound HTTPS to the BMC management
network, and (for ``boot_override``/``system_reset``) a state change on the
target server. Errors are returned as JSON, not raised.
Called by the ``tool_loader`` dispatcher as the registered ``handler`` for
the ``bmc_redfish`` entry in ``TOOLS`` (and directly in
``tests/test_credential_profile_merges.py``).
Args:
host: BMC hostname or IP, optionally with an ``http(s)://`` scheme.
user: BMC username for BasicAuth.
password: BMC password for BasicAuth.
action: Redfish operation to perform; must be in ``_REDFISH_ACTIONS``.
ctx: Tool context providing ``redis``/``user_id`` for the privilege and
credential lookups; required.
credential_profile: Optional saved profile name supplying host, user,
password, and TLS options.
system_id: Redfish Systems member id; empty auto-detects the first.
chassis_id: Chassis member id; empty auto-detects the first.
manager_id: Managers member id; empty auto-detects the first.
reset_type: ResetType for ``system_reset`` (must be in ``_RESET_TYPES``;
default ``"ForceRestart"``).
boot_override_enabled: BootSourceOverrideEnabled for ``boot_override``
(default ``"Once"``).
boot_override_target: BootSourceOverrideTarget for ``boot_override``
(default ``"Pxe"``).
verify_ssl: Whether to verify the BMC's TLS certificate (default
``False`` for self-signed BMC certs).
port: Optional HTTPS port when not 443 and not embedded in ``host``.
timeout: HTTP timeout in seconds (default 120).
Returns:
str: A JSON object carrying ``success``, ``http_status``, ``action``,
the resolved id(s), and the truncated ``data`` (or per-action payload
keys), or ``{"success": False, "error": ...}`` on a validation, auth, or
request failure.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "bmc_redfish")
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(_PREFIX_BMC_REDFISH, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"verify_ssl": verify_ssl,
"port": port,
"timeout": timeout,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if "verify_ssl" in merged:
verify_ssl = bool(merged["verify_ssl"])
if "port" in merged:
port = merged["port"]
if "timeout" in merged:
timeout = float(merged["timeout"])
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "" or _bad_pw(str(password)):
return json.dumps({"success": False, "error": "password is required."})
if not _port_ok(port):
return json.dumps({"success": False, "error": "Invalid port."})
act = (action or "").strip()
if act not in _REDFISH_ACTIONS:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_REDFISH_ACTIONS))}.",
}
)
base = _bmc_base_url(host, port)
auth = httpx.BasicAuth(user.strip(), str(password))
async with safe_httpx_client(
auth=auth,
verify=verify_ssl,
timeout=httpx.Timeout(timeout),
) as client:
if act == "service_root":
code, body = await _redfish_json(client, "GET", f"{base}/redfish/v1/")
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"data": _truncate_payload(body),
},
default=str,
)
if act == "systems_collection":
code, body = await _redfish_json(
client, "GET", f"{base}/redfish/v1/Systems"
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"data": _truncate_payload(body),
},
default=str,
)
if act == "chassis_collection":
code, body = await _redfish_json(
client, "GET", f"{base}/redfish/v1/Chassis"
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"data": _truncate_payload(body),
},
default=str,
)
if act == "managers_collection":
code, body = await _redfish_json(
client, "GET", f"{base}/redfish/v1/Managers"
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"data": _truncate_payload(body),
},
default=str,
)
if act == "update_service":
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/UpdateService",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"data": _truncate_payload(body),
},
default=str,
)
sid, serr = await _resolve_system_id(client, base, system_id or None)
if serr and act in {
"system_detail",
"processors_memory",
"network_adapters",
"log_services",
"log_entries",
"boot_properties",
"boot_override",
"system_reset",
}:
return json.dumps({"success": False, "error": serr})
if act == "system_detail" and sid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"system_id": sid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "processors_memory" and sid:
proc, mem = await asyncio.gather(
_redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Processors",
),
_redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Memory",
),
)
return json.dumps(
{
"success": proc[0] == 200 and mem[0] == 200,
"http_status": {"processors": proc[0], "memory": mem[0]},
"action": act,
"system_id": sid,
"processors": _truncate_payload(proc[1]),
"memory": _truncate_payload(mem[1]),
},
default=str,
)
if act == "network_adapters" and sid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/NetworkAdapters",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"system_id": sid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "log_services" and sid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/LogServices",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"system_id": sid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "log_entries" and sid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/LogServices",
)
entries: list[Any] = []
if code == 200 and isinstance(body, dict):
for m in (body.get("Members") or [])[:50]:
if not isinstance(m, dict):
continue
oid = m.get("@odata.id")
if not oid or not isinstance(oid, str):
continue
ec, ed = await _redfish_json(client, "GET", f"{base}{oid}/Entries")
if ec == 200:
entries.append(
{"log_service": oid, "entries": _truncate_payload(ed)}
)
return json.dumps(
{
"success": bool(entries) or code == 200,
"http_status": code,
"action": act,
"system_id": sid,
"data": entries if entries else _truncate_payload(body),
},
default=str,
)
if act == "software_inventory":
paths = [
"/redfish/v1/UpdateService/FirmwareInventory",
"/redfish/v1/UpdateService/SoftwareInventory",
]
results: list[dict[str, Any]] = []
for p in paths:
code, body = await _redfish_json(client, "GET", f"{base}{p}")
results.append(
{"path": p, "http_status": code, "data": _truncate_payload(body)}
)
ok = any(r["http_status"] == 200 for r in results)
return json.dumps(
{
"success": ok,
"action": act,
"results": results,
},
default=str,
)
cid, cerr = await _resolve_chassis_id(client, base, chassis_id or None)
if cerr and act in {"chassis_detail", "thermal", "chassis_power"}:
return json.dumps({"success": False, "error": cerr})
if act == "chassis_detail" and cid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"chassis_id": cid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "thermal" and cid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}/Thermal",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"chassis_id": cid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "chassis_power" and cid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Chassis/{quote(cid, safe='')}/Power",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"chassis_id": cid,
"data": _truncate_payload(body),
},
default=str,
)
mid, merr = await _resolve_manager_id(client, base, manager_id or None)
if merr and act == "manager_detail":
return json.dumps({"success": False, "error": merr})
if act == "manager_detail" and mid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Managers/{quote(mid, safe='')}",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"manager_id": mid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "virtual_media_collection":
mid2, merr2 = await _resolve_manager_id(client, base, manager_id or None)
if merr2:
return json.dumps({"success": False, "error": merr2})
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Managers/{quote(mid2, safe='')}/VirtualMedia",
)
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"manager_id": mid2,
"data": _truncate_payload(body),
},
default=str,
)
if act == "boot_properties" and sid:
code, body = await _redfish_json(
client,
"GET",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}",
)
boot = {}
if isinstance(body, dict):
boot = body.get("Boot") or {}
return json.dumps(
{
"success": code == 200,
"http_status": code,
"action": act,
"system_id": sid,
"boot": _truncate_payload(boot),
},
default=str,
)
if act == "boot_override" and sid:
patch_body = {
"Boot": {
"BootSourceOverrideEnabled": boot_override_enabled,
"BootSourceOverrideTarget": boot_override_target,
},
}
code, body = await _redfish_json(
client,
"PATCH",
f"{base}/redfish/v1/Systems/{quote(sid, safe='')}",
json_body=patch_body,
)
return json.dumps(
{
"success": code in (200, 204),
"http_status": code,
"action": act,
"system_id": sid,
"data": _truncate_payload(body),
},
default=str,
)
if act == "system_reset" and sid:
rt = (reset_type or "ForceRestart").strip()
if rt not in _RESET_TYPES:
return json.dumps(
{
"success": False,
"error": f"Invalid reset_type. Must be one of: {', '.join(sorted(_RESET_TYPES))}.",
}
)
url = f"{base}/redfish/v1/Systems/{quote(sid, safe='')}/Actions/ComputerSystem.Reset"
code, body = await _redfish_json(
client,
"POST",
url,
json_body={"ResetType": rt},
)
return json.dumps(
{
"success": code in (200, 202, 204),
"http_status": code,
"action": act,
"system_id": sid,
"reset_type": rt,
"data": _truncate_payload(body),
},
default=str,
)
return json.dumps({"success": False, "error": "Unhandled action."})
# --- Dell racadm -------------------------------------------------------------
_RACADM_ACTIONS = frozenset(
{
"getsysinfo",
"getversion",
"getsel",
"getniccfg",
"getraclog",
"gettracelog",
"getidrac",
"getsysteminfo",
"getversion_bios",
"getversion_nic",
"getversion_perc",
"jobqueue_view",
"jobqueue_view_all",
"serveraction_powerstatus",
"serveraction_powerup",
"serveraction_powerdown",
"serveraction_powercycle",
"serveraction_graceshutdown",
"serveraction_hardreset",
"serveraction_powerreset",
"storage_get_controllers",
"storage_get_physicaldisks",
"storage_get_virtualdisks",
"storage_get_batteries",
"storage_get_enclosures",
"storage_get_smarts",
"storage_get_summary",
}
)
def _racadm_tail(action: str) -> list[str] | None:
"""Map an allowlisted racadm action name to its fixed argv tail.
Returns the exact, pre-vetted ``racadm`` subcommand argument list for a known
action (e.g. ``"getsysinfo"`` -> ``["getsysinfo"]``,
``"getidrac"`` -> ``["get", "iDRAC"]``). Because only these constant tails are
ever appended to argv, no user-controlled tokens can reach the subcommand,
which is central to this tool's no-shell safety model.
Called by :func:`run_idrac_racadm` to translate the requested action and to
double-check (alongside ``_RACADM_ACTIONS``) that the action is allowed.
Args:
action: Action name; stripped of surrounding whitespace before lookup.
Returns:
list[str] | None: The fixed argv tail for the action, or ``None`` if the
action is not in the allowlist.
"""
fixed: dict[str, list[str]] = {
"getsysinfo": ["getsysinfo"],
"getversion": ["getversion"],
"getsel": ["getsel"],
"getniccfg": ["getniccfg"],
"getraclog": ["getraclog"],
"gettracelog": ["gettracelog"],
"getidrac": ["get", "iDRAC"],
"getsysteminfo": ["get", "SystemInfo"],
"getversion_bios": ["get", "BIOS"],
"getversion_nic": ["get", "NIC"],
"getversion_perc": ["get", "PERC"],
"jobqueue_view": ["jobqueue", "view"],
"jobqueue_view_all": ["jobqueue", "view", "-i", "all"],
"serveraction_powerstatus": ["serveraction", "powerstatus"],
"serveraction_powerup": ["serveraction", "powerup"],
"serveraction_powerdown": ["serveraction", "powerdown"],
"serveraction_powercycle": ["serveraction", "powercycle"],
"serveraction_graceshutdown": ["serveraction", "graceshutdown"],
"serveraction_hardreset": ["serveraction", "hardreset"],
"serveraction_powerreset": ["serveraction", "powerreset"],
"storage_get_controllers": ["storage", "get", "controllers"],
"storage_get_physicaldisks": ["storage", "get", "pdisks"],
"storage_get_virtualdisks": ["storage", "get", "vdisks"],
"storage_get_batteries": ["storage", "get", "batteries"],
"storage_get_enclosures": ["storage", "get", "enclosures"],
"storage_get_smarts": ["storage", "get", "smarts"],
"storage_get_summary": ["storage", "get", "summary"],
}
return fixed.get(action.strip())
def _racadm_available() -> bool:
"""Report whether the Dell ``racadm`` binary is on ``PATH``.
Resolves the executable with :func:`shutil.which`. Called by
:func:`run_idrac_racadm` to fail fast with a clear error when the host has no
racadm installed.
Returns:
bool: ``True`` if ``racadm`` is found on ``PATH``, else ``False``.
"""
return shutil.which("racadm") is not None
async def _run_subprocess(
argv: list[str],
timeout: float = 180.0,
) -> tuple[int, str, str]:
"""Run a subprocess from an argv list (no shell) and capture bounded output.
Spawns the command with :func:`asyncio.create_subprocess_exec` (argv only,
so no shell interpolation), waits up to ``timeout`` seconds, and kills the
process on timeout. Captured stdout/stderr are decoded as UTF-8 with
replacement and truncated to ``_MAX_STDOUT``/``_MAX_STDERR`` bytes.
Called by :func:`run_idrac_racadm` (for the ``racadm`` argv) and
:func:`run_smc_supermicro` (for the SMCIPMITool argv). This is a module-local
helper; unrelated functions of the same name exist in other modules
(``message_processor.admin_ops_commands``, ``tools.workflow_subagent_tools``)
and are not this one.
Args:
argv: Full command vector; ``argv[0]`` is the executable.
timeout: Wall-clock limit in seconds before the process is killed.
Returns:
tuple[int, str, str]: ``(exit_code, stdout, stderr)``. On timeout the
code is ``-1`` and stderr is ``"Command timed out."``; a missing
``returncode`` is also reported as ``-1``.
"""
proc = await asyncio.create_subprocess_exec(
*argv,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.wait()
return -1, "", "Command timed out."
code = proc.returncode if proc.returncode is not None else -1
out = stdout.decode("utf-8", errors="replace")[:_MAX_STDOUT]
err = stderr.decode("utf-8", errors="replace")[:_MAX_STDERR]
return code, out, err
[docs]
async def run_idrac_racadm(
host: str,
user: str,
password: str,
action: str,
ctx: Any = None,
*,
credential_profile: str = "",
timeout: float = 180.0,
) -> str:
"""Run one allowlisted Dell iDRAC racadm action against a remote BMC.
Backing implementation of the ``idrac_racadm`` tool. It invokes the local
``racadm`` binary in remote mode (``racadm -r`` host, ``-u`` user, ``-p``
password, then the action tail) with a fixed, pre-vetted subcommand tail
chosen from ``_RACADM_ACTIONS`` — read and power actions plus storage
``get`` views.
Because only constant argv tails (resolved by :func:`_racadm_tail`) are
appended and no shell is used, no user-controlled token can reach the
subcommand; firmware flash and destructive RAID actions are not exposed.
When ``credential_profile`` is set it loads and merges saved fields via
``_cred_load``/``_cred_merge``. Requires ``UNSANDBOXED_EXEC`` via
:func:`tools.ipmi_tools._check_priv`; checks the binary with
:func:`_racadm_available` and validates host/user/password with
:func:`_host_ok` and :func:`_user_ok`; then runs the argv through
:func:`_run_subprocess` (no shell, bounded output, timeout-killed). Side
effect: spawns the ``racadm`` process, which reaches out to the iDRAC over
the network (and may change power state for ``serveraction_*``). Errors are
returned as JSON, not raised.
Called by the ``tool_loader`` dispatcher as the registered ``handler`` for
the ``idrac_racadm`` entry in ``TOOLS``; no internal callers in this module.
Args:
host: iDRAC IP or hostname.
user: iDRAC username.
password: iDRAC password.
action: Action name; must be in ``_RACADM_ACTIONS`` and map to a tail.
ctx: Tool context providing ``redis``/``user_id``; required.
credential_profile: Optional saved profile supplying host, user,
password, and timeout.
timeout: Subprocess wall-clock timeout in seconds (default 180).
Returns:
str: A JSON object with ``success`` (true on exit code 0), ``action``,
``exit_code``, and the captured ``stdout``/``stderr``, or
``{"success": False, "error": ...}`` on a missing binary, validation, or
auth failure.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "idrac_racadm")
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(_PREFIX_IDRAC_RACADM, credential_profile.strip(), ctx)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"timeout": timeout,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if "timeout" in merged:
timeout = float(merged["timeout"])
if not _racadm_available():
return json.dumps({"success": False, "error": "racadm not found in PATH."})
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
act = (action or "").strip()
tail = _racadm_tail(act)
if act not in _RACADM_ACTIONS or tail is None:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_RACADM_ACTIONS))}.",
}
)
argv = [
"racadm",
"-r",
host.strip(),
"-u",
user.strip(),
"-p",
str(password),
*tail,
]
code, out, err = await _run_subprocess(argv, timeout=timeout)
return json.dumps(
{
"success": code == 0,
"action": act,
"exit_code": code,
"stdout": out.rstrip(),
"stderr": err.rstrip(),
}
)
# --- Supermicro --------------------------------------------------------------
_SMC_ACTIONS = frozenset(
{
"ipmi_power_status",
"ipmi_power_on",
"ipmi_power_off",
"ipmi_power_cycle",
"ipmi_power_reset",
"ipmi_sensor_list",
"ipmi_fru_print",
"ipmi_lan_print",
"ipmi_sel_list",
}
)
def _smc_bin() -> str | None:
"""Locate the Supermicro SMCIPMITool executable on ``PATH``.
Probes the common binary names (``smcipmitool``, ``SMCIPMITool``,
``smcipmitool.sh``) with :func:`shutil.which` and returns the first match.
Called by :func:`run_smc_supermicro` to find the binary or fail with a clear
"not found in PATH" error.
Returns:
str | None: The resolved absolute path to the tool, or ``None`` if none
of the candidate names are found.
"""
for name in ("smcipmitool", "SMCIPMITool", "smcipmitool.sh"):
p = shutil.which(name)
if p:
return p
return None
def _smc_tail(action: str) -> list[str] | None:
"""Map an allowlisted SMCIPMITool action name to its fixed argv tail.
Returns the pre-vetted ``ipmi`` subcommand tokens for a known action (e.g.
``"ipmi_power_status"`` -> ``["ipmi", "power", "status"]``). As with
:func:`_racadm_tail`, only these constant tails are appended to argv so no
user input reaches the subcommand.
Called by :func:`run_smc_supermicro` to translate the requested action and to
confirm (with ``_SMC_ACTIONS``) that it is allowed.
Args:
action: Action name; surrounding whitespace is stripped before lookup.
Returns:
list[str] | None: The fixed argv tail, or ``None`` if the action is not
allowlisted.
"""
m: dict[str, list[str]] = {
"ipmi_power_status": ["ipmi", "power", "status"],
"ipmi_power_on": ["ipmi", "power", "on"],
"ipmi_power_off": ["ipmi", "power", "off"],
"ipmi_power_cycle": ["ipmi", "power", "cycle"],
"ipmi_power_reset": ["ipmi", "power", "reset"],
"ipmi_sensor_list": ["ipmi", "sensor", "list"],
"ipmi_fru_print": ["ipmi", "fru", "print"],
"ipmi_lan_print": ["ipmi", "lan", "print"],
"ipmi_sel_list": ["ipmi", "sel", "list"],
}
return m.get((action or "").strip())
[docs]
async def run_smc_supermicro(
host: str,
user: str,
password: str,
action: str,
ctx: Any = None,
*,
credential_profile: str = "",
timeout: float = 180.0,
) -> str:
"""Run one allowlisted Supermicro SMCIPMITool action against a BMC.
Backing implementation of the ``smc_supermicro`` tool. It invokes the
Supermicro CLI in remote IP mode (the resolved binary followed by host,
user, password, then the ``ipmi`` subcommand tokens) with a fixed
subcommand tail chosen from ``_SMC_ACTIONS`` — power, sensor, FRU, LAN, and
SEL reads. Only the constant tails resolved by :func:`_smc_tail` are
appended and no shell is used, so user input cannot reach the subcommand.
When ``credential_profile`` is set it loads and merges saved fields via
``_cred_load``/``_cred_merge``. Requires ``UNSANDBOXED_EXEC`` via
:func:`tools.ipmi_tools._check_priv`; locates the binary with
:func:`_smc_bin` and validates host/user/password with :func:`_host_ok` and
:func:`_user_ok`; then runs the argv through :func:`_run_subprocess` (no
shell, bounded output, timeout-killed). Side effect: spawns the SMCIPMITool
process, which contacts the BMC over the network (and may change power state
for the ``ipmi_power_*`` actions). Errors are returned as JSON, not raised.
Called by the ``tool_loader`` dispatcher as the registered ``handler`` for
the ``smc_supermicro`` entry in ``TOOLS``; no internal callers in this
module.
Args:
host: BMC IP address.
user: BMC username.
password: BMC password.
action: Action name; must be in ``_SMC_ACTIONS`` and map to a tail.
ctx: Tool context providing ``redis``/``user_id``; required.
credential_profile: Optional saved profile supplying host, user,
password, and timeout.
timeout: Subprocess wall-clock timeout in seconds (default 180).
Returns:
str: A JSON object with ``success`` (true on exit code 0), ``action``,
``exit_code``, and the captured ``stdout``/``stderr``, or
``{"success": False, "error": ...}`` on a missing binary, validation, or
auth failure.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "smc_supermicro")
if auth_err:
return auth_err
if credential_profile and str(credential_profile).strip():
loaded = await _cred_load(
_PREFIX_SMC_SUPERMICRO, credential_profile.strip(), ctx
)
if isinstance(loaded, str):
return json.dumps({"success": False, "error": loaded})
merged = _cred_merge(
loaded,
{
"host": host,
"user": user,
"password": password,
"timeout": timeout,
},
)
host = str(merged.get("host") or "")
user = str(merged.get("user") or "")
password = str(merged.get("password") or "")
if "timeout" in merged:
timeout = float(merged["timeout"])
binary = _smc_bin()
if not binary:
return json.dumps(
{
"success": False,
"error": "smcipmitool (SMCIPMITool) not found in PATH.",
}
)
if not _host_ok(host):
return json.dumps({"success": False, "error": "Invalid or empty host."})
if not _user_ok(user):
return json.dumps({"success": False, "error": "Invalid or empty user."})
if password is None or str(password) == "":
return json.dumps({"success": False, "error": "password is required."})
act = (action or "").strip()
tail = _smc_tail(act)
if act not in _SMC_ACTIONS or tail is None:
return json.dumps(
{
"success": False,
"error": f"Invalid action. Must be one of: {', '.join(sorted(_SMC_ACTIONS))}.",
}
)
argv = [binary, host.strip(), user.strip(), str(password), *tail]
code, out, err = await _run_subprocess(argv, timeout=timeout)
return json.dumps(
{
"success": code == 0,
"action": act,
"exit_code": code,
"stdout": out.rstrip(),
"stderr": err.rstrip(),
}
)
# --- Encrypted credential profiles -------------------------------------------
[docs]
async def bmc_redfish_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
ctx: Any = None,
*,
verify_ssl: bool = False,
port: int | None = None,
timeout: float = 120.0,
) -> str:
"""Encrypt and persist a reusable Redfish BMC credential profile.
Bundles the connection fields (host, user, password, ``verify_ssl``, port,
timeout) so a later :func:`run_bmc_redfish` call can load them by name via
``credential_profile`` instead of re-supplying secrets each time.
Requires the ``UNSANDBOXED_EXEC`` privilege, enforced via
:func:`tools.ipmi_tools._check_priv`. Delegates storage to
:func:`tools._credential_profile_store.save_profile` (aliased ``_cred_save``)
under prefix ``bmc_redfish``, which encrypts the JSON with the per-user key
and writes it to the user's Redis hash
``stargazer:bmc_redfish_credentials:{user_id}`` (field = profile name). This
is a tool handler registered in ``TOOLS`` and is invoked by the tool
dispatcher rather than by other functions in this module.
Args:
host: BMC hostname or IP (whitespace-stripped before saving).
user: BMC username to store.
password: BMC password to store (encrypted at rest).
profile: Profile name to save under; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
verify_ssl: Whether saved sessions should verify TLS certs.
port: Optional HTTPS port to remember.
timeout: HTTP timeout in seconds to remember.
Returns:
str: A JSON result string ``{"success": ...}`` from the credential store
(or an error envelope when ``ctx`` is missing or the privilege check
fails).
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "bmc_redfish")
if auth_err:
return auth_err
data: dict[str, Any] = {
"host": host.strip(),
"user": user,
"password": password,
"verify_ssl": bool(verify_ssl),
"port": port,
"timeout": float(timeout),
}
return await _cred_save(_PREFIX_BMC_REDFISH, profile, data, ctx)
[docs]
async def bmc_redfish_list_credentials(ctx: Any = None) -> str:
"""List the saved Redfish BMC credential profile names for the user.
Returns only the profile names, never the decrypted secrets. Requires
``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.list_profile_names` (``_cred_list``),
which reads the keys of the Redis hash
``stargazer:bmc_redfish_credentials:{user_id}``. Registered in ``TOOLS`` and
called by the tool dispatcher.
Args:
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON string ``{"success": True, "profiles": [...], "count": N}`` on
success, or an error envelope when ``ctx`` is missing or unprivileged.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "bmc_redfish")
if auth_err:
return auth_err
return await _cred_list(_PREFIX_BMC_REDFISH, ctx)
[docs]
async def bmc_redfish_delete_credentials(
profile: str = "default", ctx: Any = None
) -> str:
"""Delete one saved Redfish BMC credential profile for the user.
Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.delete_profile` (``_cred_delete``),
which removes the ``profile`` field from the Redis hash
``stargazer:bmc_redfish_credentials:{user_id}``. Registered in ``TOOLS`` and
invoked by the tool dispatcher.
Args:
profile: Name of the profile to delete; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON result string from the credential store, or an error envelope
when ``ctx`` is missing or the privilege check fails.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "bmc_redfish")
if auth_err:
return auth_err
return await _cred_delete(_PREFIX_BMC_REDFISH, profile, ctx)
[docs]
async def idrac_racadm_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
ctx: Any = None,
*,
timeout: float = 180.0,
) -> str:
"""Encrypt and persist a reusable Dell iDRAC racadm credential profile.
Stores host, user, password, and subprocess timeout so a later
:func:`run_idrac_racadm` call can load them by ``credential_profile`` name.
Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.save_profile` (``_cred_save``) under
prefix ``idrac_racadm``, encrypting the JSON to the user's Redis hash
``stargazer:idrac_racadm_credentials:{user_id}``. Registered in ``TOOLS`` and
invoked by the tool dispatcher.
Args:
host: iDRAC IP or hostname (whitespace-stripped before saving).
user: iDRAC username.
password: iDRAC password (encrypted at rest).
profile: Profile name to save under; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
timeout: Subprocess timeout in seconds to remember.
Returns:
str: JSON result string from the credential store, or an error envelope
when ``ctx`` is missing or unprivileged.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "idrac_racadm")
if auth_err:
return auth_err
data: dict[str, Any] = {
"host": host.strip(),
"user": user,
"password": password,
"timeout": float(timeout),
}
return await _cred_save(_PREFIX_IDRAC_RACADM, profile, data, ctx)
[docs]
async def idrac_racadm_list_credentials(ctx: Any = None) -> str:
"""List the saved iDRAC racadm credential profile names for the user.
Returns names only, no secrets. Requires ``UNSANDBOXED_EXEC`` via
:func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.list_profile_names` (``_cred_list``)
reading the Redis hash ``stargazer:idrac_racadm_credentials:{user_id}``.
Registered in ``TOOLS`` and invoked by the tool dispatcher.
Args:
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON ``{"success": True, "profiles": [...], "count": N}`` on
success, else an error envelope.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "idrac_racadm")
if auth_err:
return auth_err
return await _cred_list(_PREFIX_IDRAC_RACADM, ctx)
[docs]
async def idrac_racadm_delete_credentials(
profile: str = "default", ctx: Any = None
) -> str:
"""Delete one saved iDRAC racadm credential profile for the user.
Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.delete_profile` (``_cred_delete``),
removing the ``profile`` field from the Redis hash
``stargazer:idrac_racadm_credentials:{user_id}``. Registered in ``TOOLS`` and
invoked by the tool dispatcher.
Args:
profile: Name of the profile to delete; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON result string from the credential store, or an error envelope
when ``ctx`` is missing or unprivileged.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "idrac_racadm")
if auth_err:
return auth_err
return await _cred_delete(_PREFIX_IDRAC_RACADM, profile, ctx)
[docs]
async def smc_supermicro_save_credentials(
host: str,
user: str,
password: str,
profile: str = "default",
ctx: Any = None,
*,
timeout: float = 180.0,
) -> str:
"""Encrypt and persist a reusable Supermicro SMCIPMITool credential profile.
Stores host, user, password, and subprocess timeout so a later
:func:`run_smc_supermicro` call can load them by ``credential_profile`` name.
Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.save_profile` (``_cred_save``) under
prefix ``smc_supermicro``, encrypting the JSON to the user's Redis hash
``stargazer:smc_supermicro_credentials:{user_id}``. Registered in ``TOOLS``
and invoked by the tool dispatcher.
Args:
host: BMC IP address (whitespace-stripped before saving).
user: BMC username.
password: BMC password (encrypted at rest).
profile: Profile name to save under; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
timeout: Subprocess timeout in seconds to remember.
Returns:
str: JSON result string from the credential store, or an error envelope
when ``ctx`` is missing or unprivileged.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "smc_supermicro")
if auth_err:
return auth_err
data: dict[str, Any] = {
"host": host.strip(),
"user": user,
"password": password,
"timeout": float(timeout),
}
return await _cred_save(_PREFIX_SMC_SUPERMICRO, profile, data, ctx)
[docs]
async def smc_supermicro_list_credentials(ctx: Any = None) -> str:
"""List the saved Supermicro SMCIPMITool credential profile names.
Returns names only, no secrets. Requires ``UNSANDBOXED_EXEC`` via
:func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.list_profile_names` (``_cred_list``)
reading the Redis hash ``stargazer:smc_supermicro_credentials:{user_id}``.
Registered in ``TOOLS`` and invoked by the tool dispatcher.
Args:
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON ``{"success": True, "profiles": [...], "count": N}`` on
success, else an error envelope.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "smc_supermicro")
if auth_err:
return auth_err
return await _cred_list(_PREFIX_SMC_SUPERMICRO, ctx)
[docs]
async def smc_supermicro_delete_credentials(
profile: str = "default", ctx: Any = None
) -> str:
"""Delete one saved Supermicro SMCIPMITool credential profile.
Requires ``UNSANDBOXED_EXEC`` via :func:`_check_priv`, then delegates to
:func:`tools._credential_profile_store.delete_profile` (``_cred_delete``),
removing the ``profile`` field from the Redis hash
``stargazer:smc_supermicro_credentials:{user_id}``. Registered in ``TOOLS``
and invoked by the tool dispatcher.
Args:
profile: Name of the profile to delete; defaults to ``"default"``.
ctx: Tool context providing ``redis`` and ``user_id``; required.
Returns:
str: JSON result string from the credential store, or an error envelope
when ``ctx`` is missing or unprivileged.
"""
if ctx is None:
return json.dumps({"success": False, "error": "No context."})
auth_err = await _check_priv(ctx, "smc_supermicro")
if auth_err:
return auth_err
return await _cred_delete(_PREFIX_SMC_SUPERMICRO, profile, ctx)
_BMC_REDFISH_DESC = (
"HTTPS Redfish to a BMC (HPE iLO, Dell iDRAC with Redfish, Lenovo XCC, many "
"Supermicro/Cisco controllers). Enum actions only — no arbitrary URLs. Covers "
"inventory, thermal/power readings, logs, firmware/software inventory views, "
"virtual media listing, boot properties, boot override (PATCH), and "
"ComputerSystem.Reset. Default verify_ssl=false (self-signed BMC certs); MITM "
"risk if used on untrusted networks. Not for generic ipmitool or Dell racadm — "
"use ipmi_control or idrac_racadm for those. Requires UNSANDBOXED_EXEC."
)
_IDRAC_DESC = (
"Dell iDRAC remote management via the system racadm binary (-r -u -P). "
"Allowlisted read and power actions including storage get (controllers, pdisks, "
"vdisks, etc.). Does not expose firmware flash or destructive RAID. For "
"standards-based IPMI use ipmi_control; for Redfish use bmc_redfish. Requires "
"UNSANDBOXED_EXEC and racadm in PATH."
)
_SMC_DESC = (
"Supermicro BMC via smcipmitool/SMCIPMITool (IP, user, password, then ipmi "
"subcommands). Allowlisted power, sensor, FRU, LAN, SEL. For Redfish on "
"Supermicro use bmc_redfish; for generic IPMI use ipmi_control. Requires "
"UNSANDBOXED_EXEC and the SMC tool in PATH."
)
_BMC_REDFISH_MAIN_DESC = (
_BMC_REDFISH_DESC
+ " credential_profile loads saved host, user, password, TLS options."
)
_IDRAC_MAIN_DESC = _IDRAC_DESC + " credential_profile loads saved host, user, password."
_SMC_MAIN_DESC = _SMC_DESC + " credential_profile loads saved host, user, password."
_BMC_REDFISH_PARAMS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "BMC hostname or IP (optional https://). Omit if credential_profile supplies it.",
},
"user": {"type": "string", "description": "BMC username."},
"password": {"type": "string", "description": "BMC password."},
"credential_profile": {
"type": "string",
"default": "",
"description": "Load host, user, password, verify_ssl, port, timeout from a saved encrypted profile.",
},
"action": {
"type": "string",
"enum": sorted(_REDFISH_ACTIONS),
"description": "Redfish operation to perform.",
},
"system_id": {
"type": "string",
"description": "Redfish Systems member id (e.g. 1). Empty = auto-detect first.",
},
"chassis_id": {
"type": "string",
"description": "Chassis member id. Empty = auto-detect first.",
},
"manager_id": {
"type": "string",
"description": "Managers member id. Empty = auto-detect first.",
},
"reset_type": {
"type": "string",
"enum": sorted(_RESET_TYPES),
"description": "For system_reset only (ComputerSystem.Reset).",
},
"boot_override_enabled": {
"type": "string",
"description": "For boot_override: e.g. Once, Continuous, Disabled.",
},
"boot_override_target": {
"type": "string",
"description": "For boot_override: e.g. Pxe, Cd, Usb, Hdd.",
},
"verify_ssl": {
"type": "boolean",
"description": "Verify TLS certificates (default false for typical BMC certs).",
},
"port": {
"type": "integer",
"description": "Optional HTTPS port if not 443 and not in host URL.",
},
"timeout": {
"type": "number",
"description": "HTTP timeout seconds (default 120).",
},
},
"required": ["action"],
}
_IDRAC_PARAMS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "iDRAC IP or hostname (optional if credential_profile set).",
},
"user": {"type": "string", "description": "iDRAC username."},
"password": {"type": "string", "description": "iDRAC password."},
"credential_profile": {
"type": "string",
"default": "",
"description": "Load host, user, password, timeout from a saved encrypted profile.",
},
"action": {
"type": "string",
"enum": sorted(_RACADM_ACTIONS),
"description": "Allowlisted racadm remote action.",
},
"timeout": {"type": "number", "description": "Subprocess timeout seconds."},
},
"required": ["action"],
}
_SMC_PARAMS = {
"type": "object",
"properties": {
"host": {
"type": "string",
"description": "BMC IP address (optional if credential_profile set).",
},
"user": {"type": "string", "description": "BMC username."},
"password": {"type": "string", "description": "BMC password."},
"credential_profile": {
"type": "string",
"default": "",
"description": "Load host, user, password, timeout from a saved encrypted profile.",
},
"action": {
"type": "string",
"enum": sorted(_SMC_ACTIONS),
"description": "Allowlisted smcipmitool ipmi action.",
},
"timeout": {"type": "number", "description": "Subprocess timeout seconds."},
},
"required": ["action"],
}
TOOLS = [
{
"name": "bmc_redfish_save_credentials",
"description": (
"Save Redfish BMC host, user, password, verify_ssl, port, and timeout encrypted "
"per-user for bmc_redfish. Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"},
"profile": {"type": "string", "default": "default"},
"verify_ssl": {"type": "boolean", "default": False},
"port": {"type": "integer", "description": "Optional HTTPS port."},
"timeout": {"type": "number", "default": 120},
},
"required": ["host", "user", "password"],
},
"handler": bmc_redfish_save_credentials,
},
{
"name": "bmc_redfish_list_credentials",
"description": "List saved Redfish BMC profile names. Requires UNSANDBOXED_EXEC.",
"parameters": {"type": "object", "properties": {}},
"handler": bmc_redfish_list_credentials,
},
{
"name": "bmc_redfish_delete_credentials",
"description": "Delete a saved Redfish BMC profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": bmc_redfish_delete_credentials,
},
{
"name": "bmc_redfish",
"description": _BMC_REDFISH_MAIN_DESC,
"parameters": _BMC_REDFISH_PARAMS,
"handler": run_bmc_redfish,
},
{
"name": "idrac_racadm_save_credentials",
"description": (
"Save iDRAC racadm host, user, password, and timeout encrypted per-user. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"},
"profile": {"type": "string", "default": "default"},
"timeout": {"type": "number", "default": 180},
},
"required": ["host", "user", "password"],
},
"handler": idrac_racadm_save_credentials,
},
{
"name": "idrac_racadm_list_credentials",
"description": "List saved iDRAC racadm profile names. Requires UNSANDBOXED_EXEC.",
"parameters": {"type": "object", "properties": {}},
"handler": idrac_racadm_list_credentials,
},
{
"name": "idrac_racadm_delete_credentials",
"description": "Delete a saved iDRAC racadm profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": idrac_racadm_delete_credentials,
},
{
"name": "idrac_racadm",
"description": _IDRAC_MAIN_DESC,
"parameters": _IDRAC_PARAMS,
"handler": run_idrac_racadm,
},
{
"name": "smc_supermicro_save_credentials",
"description": (
"Save Supermicro SMCIPMITool host, user, password, and timeout encrypted per-user. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"host": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"},
"profile": {"type": "string", "default": "default"},
"timeout": {"type": "number", "default": 180},
},
"required": ["host", "user", "password"],
},
"handler": smc_supermicro_save_credentials,
},
{
"name": "smc_supermicro_list_credentials",
"description": "List saved Supermicro SMC profile names. Requires UNSANDBOXED_EXEC.",
"parameters": {"type": "object", "properties": {}},
"handler": smc_supermicro_list_credentials,
},
{
"name": "smc_supermicro_delete_credentials",
"description": "Delete a saved Supermicro SMC profile. Requires UNSANDBOXED_EXEC.",
"parameters": {
"type": "object",
"properties": {"profile": {"type": "string", "default": "default"}},
"required": ["profile"],
},
"handler": smc_supermicro_delete_credentials,
},
{
"name": "smc_supermicro",
"description": _SMC_MAIN_DESC,
"parameters": _SMC_PARAMS,
"handler": run_smc_supermicro,
},
]