"""Direct Linux cgroup v2 and namespace management.
Read/write cgroup pseudo-files under the unified hierarchy, create/remove cgroup
directories, move processes via ``cgroup.procs``, and run commands under
``unshare`` or ``nsenter`` (util-linux). All operations require the
``UNSANDBOXED_EXEC`` privilege.
Paths are always canonicalized and must resolve under ``/sys/fs/cgroup`` (no
``..`` escapes to other filesystem trees).
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import shutil
from pathlib import Path
from typing import Any
from tools._stargazer_pid_cgroup import CGROUP_MOUNT
logger = logging.getLogger(__name__)
_MAX_READ_BYTES_DEFAULT = 256 * 1024
_MAX_READ_BYTES_CAP = 512 * 1024
_MAX_IO_CAPTURE = 256 * 1024
_DEFAULT_SUBPROCESS_TIMEOUT = 120
_MAX_SUBPROCESS_TIMEOUT = 600
async def _check_priv(ctx: Any) -> str | None:
"""Verify the calling user holds the ``UNSANDBOXED_EXEC`` privilege.
Acts as the authorization gate for every handler in this module: cgroup
pseudo-file I/O and ``unshare``/``nsenter`` execution all run with the
host's elevated capabilities, so each must pass this check first. Returns
``None`` to mean "allowed" and a serialized JSON error string to mean
"denied", letting callers short-circuit with a single ``if pe: return pe``.
Lazily imports :data:`tools.alter_privileges.PRIVILEGES` and
:func:`tools.alter_privileges.has_privilege`, reads ``ctx.redis``,
``ctx.config`` and ``ctx.user_id`` off the :class:`ToolContext`, and awaits
``has_privilege`` to test the ``UNSANDBOXED_EXEC`` bit against the user's
Redis-backed privilege mask. Performs no writes; only the privilege lookup
touches Redis. Called by all seven tool handlers in this module
(``cgroup_read_file``, ``cgroup_write_file``, ``cgroup_mkdir``,
``cgroup_rmdir``, ``cgroup_move_pid``, ``linux_unshare_exec`` and
``linux_nsenter_exec``) immediately after their ``ctx is None`` guard.
Args:
ctx: The :class:`ToolContext` for the invocation; ``redis``, ``config``
and ``user_id`` attributes are read from it (missing attributes are
treated as absent / empty).
Returns:
str | None: ``None`` when the user has the privilege; otherwise a
JSON-encoded ``{"success": False, "error": ...}`` string describing the
denial (missing privilege) or that the privilege system is unavailable.
"""
try:
from tools.alter_privileges import PRIVILEGES, has_privilege
redis = getattr(ctx, "redis", None)
config = getattr(ctx, "config", None)
user_id = getattr(ctx, "user_id", "") or ""
if not await has_privilege(
redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config
):
return json.dumps(
{
"success": False,
"error": (
"The user does not have the UNSANDBOXED_EXEC privilege. "
"Ask an admin to grant it with the alter_privileges tool."
),
}
)
except ImportError:
return json.dumps({"success": False, "error": "Privilege system unavailable."})
return None
def _cgroup_mount_resolved() -> Path:
"""Return the fully resolved absolute path of the cgroup v2 mount root.
Resolves :data:`tools._stargazer_pid_cgroup.CGROUP_MOUNT`
(``/sys/fs/cgroup``) through symlinks so it can be compared against
canonicalized request paths. Used as the trusted boundary for confinement:
:func:`_resolve_under_cgroup` requires every target to live under this
directory, and :func:`cgroup_rmdir` compares against it to refuse removing
the mount root itself.
Performs a filesystem ``resolve`` (a stat-level lookup) but no mutation.
Returns:
Path: The canonical absolute path to the cgroup mount root.
"""
return CGROUP_MOUNT.resolve()
def _resolve_under_cgroup(raw: str) -> tuple[Path | None, str | None]:
"""Canonicalize a request path and confine it under the cgroup mount.
Acts as the security boundary shared by every cgroup pseudo-file handler:
a relative path is anchored under :data:`CGROUP_MOUNT`
(``/sys/fs/cgroup``), an absolute path is taken as-is, and the result is
fully resolved through symlinks before being checked against the resolved
mount root from :func:`_cgroup_mount_resolved`. This is what prevents
``..`` traversal or symlink tricks from reaching arbitrary files outside
the unified hierarchy.
Performs a filesystem ``resolve`` (a stat-level lookup) but no mutation.
Called by :func:`cgroup_read_file`, :func:`cgroup_write_file`,
:func:`cgroup_mkdir`, :func:`cgroup_rmdir` and :func:`cgroup_move_pid` to
validate their ``path`` / ``cgroup_path`` argument before any I/O.
Args:
raw: The user-supplied path string; whitespace is stripped and an
empty value is rejected.
Returns:
tuple[Path | None, str | None]: ``(resolved_path, None)`` when the path
is non-empty, resolvable, and lives under the cgroup mount; otherwise
``(None, error_message)`` for an empty path, an ``OSError`` during
resolution, or a target that escapes the mount root.
"""
s = (raw or "").strip()
if not s:
return None, "Empty path."
p = Path(s)
if not p.is_absolute():
p = CGROUP_MOUNT / p
try:
resolved = p.resolve()
except OSError as e:
return None, f"Could not resolve path: {e}"
mount = _cgroup_mount_resolved()
try:
resolved.relative_to(mount)
except ValueError:
return None, f"Path must be under cgroup mount {mount}."
return resolved, None
async def _run_exec_limited(
cmd: list[str],
*,
timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT,
) -> tuple[int, str, str]:
"""Run a command via ``execvp`` with a hard timeout and bounded output.
Spawns ``cmd`` directly with :func:`asyncio.create_subprocess_exec` (no
shell, so the argument list is passed verbatim and not word-split),
captures stdout/stderr, and enforces ``timeout`` with
:func:`asyncio.wait_for`. On timeout the child is killed and a synthetic
failure is returned; captured streams are decoded with ``errors="replace"``
and each is truncated to :data:`_MAX_IO_CAPTURE` bytes with a
``"... [truncated]"`` marker to cap memory and response size.
The only side effect is the spawned subprocess; errors during spawn
(``FileNotFoundError`` for a missing executable, or any other exception,
which is logged via :data:`logger`) are converted into a ``-1`` return code
rather than propagated. Called by :func:`linux_unshare_exec` and
:func:`linux_nsenter_exec` to execute the assembled ``unshare``/``nsenter``
command lines.
Args:
cmd: Full argument vector to execute; ``cmd[0]`` is the program path.
timeout: Maximum wall-clock seconds to allow before killing the child.
Returns:
tuple[int, str, str]: ``(returncode, stdout, stderr)`` where both
strings are stripped and truncated; ``returncode`` is ``-1`` on
timeout, missing executable, or other spawn failure (and is also
normalized to ``-1`` if the process reports ``None``).
"""
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout_b, stderr_b = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
except asyncio.TimeoutError:
try:
proc.kill()
except ProcessLookupError:
pass
return -1, "", f"Command timed out after {timeout}s."
except FileNotFoundError as exc:
return -1, "", f"Executable not found: {exc}"
except Exception as exc:
logger.error("_run_exec_limited %s: %s", cmd, exc, exc_info=True)
return -1, "", str(exc)
out = stdout_b.decode(errors="replace")
err = stderr_b.decode(errors="replace")
if len(out) > _MAX_IO_CAPTURE:
out = out[:_MAX_IO_CAPTURE] + "\n... [truncated]"
if len(err) > _MAX_IO_CAPTURE:
err = err[:_MAX_IO_CAPTURE] + "\n... [truncated]"
rc = proc.returncode
if rc is None:
rc = -1
return rc, out.strip(), err.strip()
def _json_ok(**extra: Any) -> str:
"""Serialize a success envelope to a JSON string.
Builds ``{"success": True, **extra}`` and dumps it via the module's
``jsonutil`` (imported as ``json``), giving every handler a uniform
success shape on top of which it layers result fields (e.g. ``path``,
``content``, ``returncode``). Pure formatting; no I/O or side effects.
Called on the success path of all seven tool handlers in this module.
Args:
**extra: Arbitrary result fields merged into the envelope alongside
``success``.
Returns:
str: The JSON-encoded success envelope.
"""
payload: dict[str, Any] = {"success": True, **extra}
return json.dumps(payload)
def _json_err(msg: str) -> str:
"""Serialize a failure envelope to a JSON string.
Dumps ``{"success": False, "error": msg}`` via ``jsonutil`` so every
handler reports errors in one consistent shape. Pure formatting; no I/O or
side effects. Called throughout this module for missing context, path
validation failures, and OS errors from the cgroup/subprocess operations.
Args:
msg: Human-readable error message placed under the ``error`` key.
Returns:
str: The JSON-encoded failure envelope.
"""
return json.dumps({"success": False, "error": msg})
# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------
[docs]
async def cgroup_read_file(
path: str,
max_bytes: int = _MAX_READ_BYTES_DEFAULT,
ctx: Any = None,
) -> str:
"""Read a cgroup v2 pseudo-file under the unified hierarchy as text.
Handler for the ``cgroup_read_file`` tool: it lets a privileged user
inspect cgroup controller state (e.g. ``memory.current``,
``cgroup.controllers``) by reading the pseudo-file at ``path``. The read
is bounded so a large or pathological file cannot blow up the response.
Requires the ``UNSANDBOXED_EXEC`` privilege, enforced via
:func:`_check_priv`. The ``path`` is confined to ``/sys/fs/cgroup`` by
:func:`_resolve_under_cgroup`, then the bytes are read directly off the
filesystem and decoded with ``errors="replace"``, truncating past
``max_bytes`` (capped at :data:`_MAX_READ_BYTES_CAP`). Results are wrapped
with :func:`_json_ok` / :func:`_json_err`. Invoked through the tool
dispatcher via the ``handler`` entry in this module's ``TOOLS`` list (see
``tool_loader.py``); no direct in-repo callers.
Args:
path: Path to the pseudo-file, absolute or relative to the cgroup
mount.
max_bytes: Maximum number of bytes to return, clamped to the range
``[1, _MAX_READ_BYTES_CAP]``.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with ``path``, ``content`` and
``bytes_read``, or a JSON error envelope on a missing context, denied
privilege, path-validation failure, or ``OSError``.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
max_bytes = max(1, min(int(max_bytes), _MAX_READ_BYTES_CAP))
resolved, err = _resolve_under_cgroup(path)
if err:
return _json_err(err)
try:
data = resolved.read_bytes()
except OSError as e:
return _json_err(f"Read failed: {e}")
if len(data) > max_bytes:
text = data[:max_bytes].decode(errors="replace") + "\n... [truncated]"
else:
text = data.decode(errors="replace")
return _json_ok(
path=str(resolved), content=text, bytes_read=min(len(data), max_bytes)
)
[docs]
async def cgroup_write_file(
path: str,
content: str,
ctx: Any = None,
) -> str:
"""Write UTF-8 text into a cgroup v2 pseudo-file (a control operation).
Handler for the ``cgroup_write_file`` tool: this is how the bot actually
reconfigures a cgroup, since cgroup v2 is driven entirely by writes to
interface files (e.g. enabling controllers via ``cgroup.subtree_control``
or setting a limit via ``memory.max``). The write goes straight to the
kernel-backed pseudo-file, so its semantics depend on which file is
targeted.
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`, and
the ``path`` is confined under ``/sys/fs/cgroup`` by
:func:`_resolve_under_cgroup` before :meth:`pathlib.Path.write_text` is
called. Envelopes come from :func:`_json_ok` / :func:`_json_err`. Invoked
through the tool dispatcher via the ``handler`` entry in this module's
``TOOLS`` list; no direct in-repo callers.
Args:
path: Target pseudo-file path, absolute or relative to the cgroup
mount.
content: UTF-8 text to write verbatim to the file.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with ``path`` and ``bytes_written``, or a
JSON error envelope on a missing context, denied privilege,
path-validation failure, or ``OSError`` from the write.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
resolved, err = _resolve_under_cgroup(path)
if err:
return _json_err(err)
try:
resolved.write_text(content, encoding="utf-8")
except OSError as e:
return _json_err(f"Write failed: {e}")
return _json_ok(path=str(resolved), bytes_written=len(content.encode("utf-8")))
[docs]
async def cgroup_mkdir(
path: str,
parents: bool = True,
ctx: Any = None,
) -> str:
"""Create a cgroup directory, optionally with its parents.
Handler for the ``cgroup_mkdir`` tool. Creating a directory under the
unified hierarchy is how a new child cgroup is materialized: the kernel
auto-populates it with the relevant ``cgroup.*`` interface files, after
which processes can be moved in and limits applied. With ``parents`` true
this behaves like ``mkdir -p`` and is idempotent (``exist_ok=True``).
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`; the
``path`` is confined under ``/sys/fs/cgroup`` by
:func:`_resolve_under_cgroup` before :meth:`pathlib.Path.mkdir`. Results
use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool
dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no
direct in-repo callers.
Args:
path: Directory to create, absolute or relative to the cgroup mount.
parents: When true, create any missing parent directories as well.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with the created ``path``, or a JSON
error envelope on a missing context, denied privilege,
path-validation failure, or ``OSError`` from the mkdir.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
resolved, err = _resolve_under_cgroup(path)
if err:
return _json_err(err)
try:
resolved.mkdir(parents=parents, exist_ok=True)
except OSError as e:
return _json_err(f"mkdir failed: {e}")
return _json_ok(path=str(resolved))
[docs]
async def cgroup_rmdir(
path: str,
ctx: Any = None,
) -> str:
"""Remove an empty cgroup directory under the unified hierarchy.
Handler for the ``cgroup_rmdir`` tool, the inverse of
:func:`cgroup_mkdir`. The kernel only allows an empty cgroup to be
removed, so this cleans up a child cgroup once every process has been
migrated out of it. As an extra guard it refuses to delete the mount root
itself.
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`; the
``path`` is confined under ``/sys/fs/cgroup`` by
:func:`_resolve_under_cgroup` and compared against
:func:`_cgroup_mount_resolved` before :meth:`pathlib.Path.rmdir`. Results
use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool
dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no
direct in-repo callers.
Args:
path: Empty directory to remove, absolute or relative to the cgroup
mount.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with the removed ``path``, or a JSON
error envelope on a missing context, denied privilege,
path-validation failure, an attempt to remove the mount root, or an
``OSError`` (notably when the directory is non-empty).
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
resolved, err = _resolve_under_cgroup(path)
if err:
return _json_err(err)
if resolved == _cgroup_mount_resolved():
return _json_err("Refusing to remove the cgroup mount root.")
try:
resolved.rmdir()
except OSError as e:
return _json_err(f"rmdir failed (directory must be empty): {e}")
return _json_ok(path=str(resolved))
[docs]
async def cgroup_move_pid(
cgroup_path: str,
pid: int,
ctx: Any = None,
) -> str:
"""Move a process into a cgroup by writing its PID to ``cgroup.procs``.
Handler for the ``cgroup_move_pid`` tool. In cgroup v2 a task is assigned
to a cgroup by writing its PID into that cgroup's ``cgroup.procs`` file;
this is the step that actually subjects a running process to the cgroup's
controllers and limits. The target directory must already exist (created
via :func:`cgroup_mkdir`).
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`. The
``cgroup_path`` is confined under ``/sys/fs/cgroup`` by
:func:`_resolve_under_cgroup`, checked to be an existing directory, and
the PID is written to ``cgroup.procs`` inside it. Results use
:func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher
via the ``handler`` entry in this module's ``TOOLS`` list; no direct
in-repo callers.
Args:
cgroup_path: Existing target cgroup directory, absolute or relative to
the cgroup mount.
pid: Process ID to move into the cgroup.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with ``cgroup_path`` and ``pid``, or a
JSON error envelope on a missing context, denied privilege,
path-validation failure, a non-directory target, or an ``OSError``
from the write.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
resolved, err = _resolve_under_cgroup(cgroup_path)
if err:
return _json_err(err)
if not resolved.is_dir():
return _json_err("cgroup_path must be an existing cgroup directory.")
procs = resolved / "cgroup.procs"
try:
procs.write_text(f"{int(pid)}\n", encoding="utf-8")
except OSError as e:
return _json_err(f"Could not write PID to cgroup.procs: {e}")
return _json_ok(cgroup_path=str(resolved), pid=int(pid))
def _sanitize_flag_list(
flags: list[Any], name: str
) -> tuple[list[str] | None, str | None]:
"""Normalize a list of CLI flag tokens into properly-dashed options.
Validates that ``flags`` is a list of non-empty strings, then canonicalizes
each entry: tokens already starting with ``-`` are kept verbatim,
single-character tokens become short options (``"m"`` -> ``"-m"``), and
longer tokens become long options (``"mount"`` -> ``"--mount"``). This lets
the tool accept user-friendly flag names while still emitting a valid
``unshare``/``nsenter`` argument vector.
Pure validation/transformation; no I/O or side effects. Called by
:func:`linux_unshare_exec` (for ``unshare_flags``) and
:func:`linux_nsenter_exec` (for ``nsenter_flags``) before the flags are
spliced into the command passed to :func:`_run_exec_limited`.
Args:
flags: The candidate flag tokens to sanitize.
name: The parameter name (e.g. ``"unshare_flags"``) used to build a
descriptive validation error message.
Returns:
tuple[list[str] | None, str | None]: ``(normalized_flags, None)`` on
success, or ``(None, error_message)`` when ``flags`` is not a list or
contains a non-string / empty entry.
"""
if not isinstance(flags, list):
return None, f"{name} must be a list of strings."
out: list[str] = []
for x in flags:
if not isinstance(x, str) or not x:
return None, f"{name} entries must be non-empty strings."
if x.startswith("-"):
out.append(x)
elif len(x) == 1:
out.append(f"-{x}")
else:
out.append(f"--{x}")
return out, None
[docs]
async def linux_unshare_exec(
command: list[str],
unshare_flags: list[str] | None = None,
timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT,
ctx: Any = None,
) -> str:
"""Run a command in fresh Linux namespaces via ``unshare`` (util-linux).
Handler for the ``linux_unshare_exec`` tool. It assembles and runs
``unshare [unshare_flags] -- command ...``, letting a privileged user
execute a process inside newly unshared namespaces (mount, PID, net, user,
etc.) for isolation or experimentation. The flag tokens are accepted in a
forgiving form and normalized to real options.
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`,
validates that ``command`` is a non-empty list, normalizes ``unshare_flags``
through :func:`_sanitize_flag_list`, resolves the ``unshare`` binary with
:func:`shutil.which`, clamps ``timeout`` to :data:`_MAX_SUBPROCESS_TIMEOUT`,
and executes the assembled argv via :func:`_run_exec_limited` (no shell).
Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool
dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no
direct in-repo callers.
Args:
command: Executable and arguments to run after ``--``; must be a
non-empty list of strings.
unshare_flags: Optional namespace flags, accepted as bare names or
short/long options and normalized before use.
timeout: Wall-clock seconds before the child is killed, clamped to
``[1, _MAX_SUBPROCESS_TIMEOUT]``.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with ``returncode``, ``stdout``,
``stderr`` and the full ``command``, or a JSON error envelope on a
missing context, denied privilege, invalid arguments, or a missing
``unshare`` executable.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
if not command or not isinstance(command, list):
return _json_err("command must be a non-empty list of strings.")
cmd_parts = [str(x) for x in command]
if not cmd_parts:
return _json_err("command must not be empty.")
flags = unshare_flags if unshare_flags is not None else []
flist, ferr = _sanitize_flag_list(flags, "unshare_flags")
if ferr:
return _json_err(ferr)
unshare_bin = shutil.which("unshare")
if not unshare_bin:
return _json_err("unshare executable not found on PATH (install util-linux).")
timeout = max(1, min(int(timeout), _MAX_SUBPROCESS_TIMEOUT))
full_cmd = [unshare_bin, *flist, "--", *cmd_parts]
rc, out, err = await _run_exec_limited(full_cmd, timeout=timeout)
return _json_ok(returncode=rc, stdout=out, stderr=err, command=full_cmd)
[docs]
async def linux_nsenter_exec(
target_pid: int,
command: list[str],
nsenter_flags: list[str] | None = None,
timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT,
ctx: Any = None,
) -> str:
"""Run a command inside another process's namespaces via ``nsenter``.
Handler for the ``linux_nsenter_exec`` tool. It assembles and runs
``nsenter -t <pid> [nsenter_flags] -- command ...`` (util-linux), so a
privileged user can join an existing task's namespaces -- for example to
debug inside a container by entering its mount and network namespaces.
This is the counterpart to :func:`linux_unshare_exec`, which instead
creates new namespaces.
Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`,
validates ``target_pid`` is positive and ``command`` is a non-empty list,
normalizes ``nsenter_flags`` through :func:`_sanitize_flag_list`, resolves
the ``nsenter`` binary with :func:`shutil.which`, clamps ``timeout`` to
:data:`_MAX_SUBPROCESS_TIMEOUT`, and executes the assembled argv via
:func:`_run_exec_limited` (no shell). Results use :func:`_json_ok` /
:func:`_json_err`. Invoked through the tool dispatcher via the ``handler``
entry in this module's ``TOOLS`` list; no direct in-repo callers.
Args:
target_pid: PID of the process whose namespaces to enter; must be a
positive integer.
command: Executable and arguments to run after ``--``; must be a
non-empty list of strings.
nsenter_flags: Optional namespace flags, accepted as bare names or
short/long options and normalized before use.
timeout: Wall-clock seconds before the child is killed, clamped to
``[1, _MAX_SUBPROCESS_TIMEOUT]``.
ctx: The :class:`ToolContext`; required for the privilege check.
Returns:
str: A JSON success envelope with ``returncode``, ``stdout``,
``stderr`` and the full ``command``, or a JSON error envelope on a
missing context, denied privilege, invalid arguments, or a missing
``nsenter`` executable.
"""
if ctx is None:
return _json_err("No context.")
pe = await _check_priv(ctx)
if pe:
return pe
pid = int(target_pid)
if pid < 1:
return _json_err("target_pid must be a positive integer.")
if not command or not isinstance(command, list):
return _json_err("command must be a non-empty list of strings.")
cmd_parts = [str(x) for x in command]
if not cmd_parts:
return _json_err("command must not be empty.")
flags = nsenter_flags if nsenter_flags is not None else []
flist, ferr = _sanitize_flag_list(flags, "nsenter_flags")
if ferr:
return _json_err(ferr)
nsenter_bin = shutil.which("nsenter")
if not nsenter_bin:
return _json_err("nsenter executable not found on PATH (install util-linux).")
timeout = max(1, min(int(timeout), _MAX_SUBPROCESS_TIMEOUT))
full_cmd = [nsenter_bin, "-t", str(pid), *flist, "--", *cmd_parts]
rc, out, err = await _run_exec_limited(full_cmd, timeout=timeout)
return _json_ok(returncode=rc, stdout=out, stderr=err, command=full_cmd)
# ---------------------------------------------------------------------------
# TOOLS
# ---------------------------------------------------------------------------
TOOLS = [
{
"name": "cgroup_read_file",
"description": (
"Read a cgroup v2 pseudo-file under /sys/fs/cgroup. "
"Path may be absolute or relative to the cgroup mount. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Path to the file under the cgroup hierarchy.",
},
"max_bytes": {
"type": "integer",
"description": f"Max bytes to read (default {_MAX_READ_BYTES_DEFAULT}, max {_MAX_READ_BYTES_CAP}).",
},
},
"required": ["path"],
},
"handler": cgroup_read_file,
},
{
"name": "cgroup_write_file",
"description": (
"Write text to a cgroup pseudo-file (e.g. cgroup.subtree_control, memory.max). "
"Path must resolve under /sys/fs/cgroup. Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "Target file path."},
"content": {"type": "string", "description": "UTF-8 text to write."},
},
"required": ["path", "content"],
},
"handler": cgroup_write_file,
},
{
"name": "cgroup_mkdir",
"description": (
"Create a cgroup subdirectory (equivalent to mkdir -p when parents is true). "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Cgroup directory to create.",
},
"parents": {
"type": "boolean",
"description": "If true, create parent directories as needed (default true).",
},
},
"required": ["path"],
},
"handler": cgroup_mkdir,
},
{
"name": "cgroup_rmdir",
"description": (
"Remove an empty cgroup directory under /sys/fs/cgroup. "
"Cannot remove the mount root. Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"path": {
"type": "string",
"description": "Empty cgroup directory to remove.",
},
},
"required": ["path"],
},
"handler": cgroup_rmdir,
},
{
"name": "cgroup_move_pid",
"description": (
"Move a process into a cgroup by appending its PID to cgroup.procs. "
"cgroup_path is the leaf cgroup directory. Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"cgroup_path": {
"type": "string",
"description": "Path to the target cgroup directory.",
},
"pid": {"type": "integer", "description": "Process ID to move."},
},
"required": ["cgroup_path", "pid"],
},
"handler": cgroup_move_pid,
},
{
"name": "linux_unshare_exec",
"description": (
"Run a command in new namespace(s) using unshare(1) from util-linux: "
"unshare [unshare_flags] -- command. Pass flags as short or long options "
"(e.g. --mount, --pid, --fork). Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"command": {
"type": "array",
"items": {"type": "string"},
"description": "Executable and arguments after '--'.",
},
"unshare_flags": {
"type": "array",
"items": {"type": "string"},
"description": "Options for unshare (e.g. m, u, i, n, p, U or long names).",
},
"timeout": {
"type": "integer",
"description": f"Seconds (default {_DEFAULT_SUBPROCESS_TIMEOUT}, max {_MAX_SUBPROCESS_TIMEOUT}).",
},
},
"required": ["command"],
},
"handler": linux_unshare_exec,
},
{
"name": "linux_nsenter_exec",
"description": (
"Run a command in another task's namespaces using nsenter(1): "
"nsenter -t pid [nsenter_flags] -- command. "
"Requires UNSANDBOXED_EXEC."
),
"parameters": {
"type": "object",
"properties": {
"target_pid": {"type": "integer", "description": "Target process ID."},
"command": {
"type": "array",
"items": {"type": "string"},
"description": "Executable and arguments after '--'.",
},
"nsenter_flags": {
"type": "array",
"items": {"type": "string"},
"description": "Namespace options (e.g. --mount, --net, --all).",
},
"timeout": {
"type": "integer",
"description": f"Seconds (default {_DEFAULT_SUBPROCESS_TIMEOUT}, max {_MAX_SUBPROCESS_TIMEOUT}).",
},
},
"required": ["target_pid", "command"],
},
"handler": linux_nsenter_exec,
},
]