Source code for tools.linux_cgroup_ns_tools

"""Direct Linux cgroup v2 and namespace management.

Read/write cgroup pseudo-files under the unified hierarchy, create/remove cgroup
directories, move processes via ``cgroup.procs``, and run commands under
``unshare`` or ``nsenter`` (util-linux). All operations require the
``UNSANDBOXED_EXEC`` privilege.

Paths are always canonicalized and must resolve under ``/sys/fs/cgroup`` (no
``..`` escapes to other filesystem trees).
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import shutil
from pathlib import Path
from typing import Any

from tools._stargazer_pid_cgroup import CGROUP_MOUNT

logger = logging.getLogger(__name__)

_MAX_READ_BYTES_DEFAULT = 256 * 1024
_MAX_READ_BYTES_CAP = 512 * 1024
_MAX_IO_CAPTURE = 256 * 1024
_DEFAULT_SUBPROCESS_TIMEOUT = 120
_MAX_SUBPROCESS_TIMEOUT = 600


async def _check_priv(ctx: Any) -> str | None:
    """Verify the calling user holds the ``UNSANDBOXED_EXEC`` privilege.

    Acts as the authorization gate for every handler in this module: cgroup
    pseudo-file I/O and ``unshare``/``nsenter`` execution all run with the
    host's elevated capabilities, so each must pass this check first. Returns
    ``None`` to mean "allowed" and a serialized JSON error string to mean
    "denied", letting callers short-circuit with a single ``if pe: return pe``.

    Lazily imports :data:`tools.alter_privileges.PRIVILEGES` and
    :func:`tools.alter_privileges.has_privilege`, reads ``ctx.redis``,
    ``ctx.config`` and ``ctx.user_id`` off the :class:`ToolContext`, and awaits
    ``has_privilege`` to test the ``UNSANDBOXED_EXEC`` bit against the user's
    Redis-backed privilege mask. Performs no writes; only the privilege lookup
    touches Redis. Called by all seven tool handlers in this module
    (``cgroup_read_file``, ``cgroup_write_file``, ``cgroup_mkdir``,
    ``cgroup_rmdir``, ``cgroup_move_pid``, ``linux_unshare_exec`` and
    ``linux_nsenter_exec``) immediately after their ``ctx is None`` guard.

    Args:
        ctx: The :class:`ToolContext` for the invocation; ``redis``, ``config``
            and ``user_id`` attributes are read from it (missing attributes are
            treated as absent / empty).

    Returns:
        str | None: ``None`` when the user has the privilege; otherwise a
        JSON-encoded ``{"success": False, "error": ...}`` string describing the
        denial (missing privilege) or that the privilege system is unavailable.
    """
    try:
        from tools.alter_privileges import PRIVILEGES, has_privilege

        redis = getattr(ctx, "redis", None)
        config = getattr(ctx, "config", None)
        user_id = getattr(ctx, "user_id", "") or ""
        if not await has_privilege(
            redis, user_id, PRIVILEGES["UNSANDBOXED_EXEC"], config
        ):
            return json.dumps(
                {
                    "success": False,
                    "error": (
                        "The user does not have the UNSANDBOXED_EXEC privilege. "
                        "Ask an admin to grant it with the alter_privileges tool."
                    ),
                }
            )
    except ImportError:
        return json.dumps({"success": False, "error": "Privilege system unavailable."})
    return None


def _cgroup_mount_resolved() -> Path:
    """Return the fully resolved absolute path of the cgroup v2 mount root.

    Resolves :data:`tools._stargazer_pid_cgroup.CGROUP_MOUNT`
    (``/sys/fs/cgroup``) through symlinks so it can be compared against
    canonicalized request paths. Used as the trusted boundary for confinement:
    :func:`_resolve_under_cgroup` requires every target to live under this
    directory, and :func:`cgroup_rmdir` compares against it to refuse removing
    the mount root itself.

    Performs a filesystem ``resolve`` (a stat-level lookup) but no mutation.

    Returns:
        Path: The canonical absolute path to the cgroup mount root.
    """
    return CGROUP_MOUNT.resolve()


def _resolve_under_cgroup(raw: str) -> tuple[Path | None, str | None]:
    """Canonicalize a request path and confine it under the cgroup mount.

    Acts as the security boundary shared by every cgroup pseudo-file handler:
    a relative path is anchored under :data:`CGROUP_MOUNT`
    (``/sys/fs/cgroup``), an absolute path is taken as-is, and the result is
    fully resolved through symlinks before being checked against the resolved
    mount root from :func:`_cgroup_mount_resolved`. This is what prevents
    ``..`` traversal or symlink tricks from reaching arbitrary files outside
    the unified hierarchy.

    Performs a filesystem ``resolve`` (a stat-level lookup) but no mutation.
    Called by :func:`cgroup_read_file`, :func:`cgroup_write_file`,
    :func:`cgroup_mkdir`, :func:`cgroup_rmdir` and :func:`cgroup_move_pid` to
    validate their ``path`` / ``cgroup_path`` argument before any I/O.

    Args:
        raw: The user-supplied path string; whitespace is stripped and an
            empty value is rejected.

    Returns:
        tuple[Path | None, str | None]: ``(resolved_path, None)`` when the path
        is non-empty, resolvable, and lives under the cgroup mount; otherwise
        ``(None, error_message)`` for an empty path, an ``OSError`` during
        resolution, or a target that escapes the mount root.
    """
    s = (raw or "").strip()
    if not s:
        return None, "Empty path."
    p = Path(s)
    if not p.is_absolute():
        p = CGROUP_MOUNT / p
    try:
        resolved = p.resolve()
    except OSError as e:
        return None, f"Could not resolve path: {e}"
    mount = _cgroup_mount_resolved()
    try:
        resolved.relative_to(mount)
    except ValueError:
        return None, f"Path must be under cgroup mount {mount}."
    return resolved, None


async def _run_exec_limited(
    cmd: list[str],
    *,
    timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT,
) -> tuple[int, str, str]:
    """Run a command via ``execvp`` with a hard timeout and bounded output.

    Spawns ``cmd`` directly with :func:`asyncio.create_subprocess_exec` (no
    shell, so the argument list is passed verbatim and not word-split),
    captures stdout/stderr, and enforces ``timeout`` with
    :func:`asyncio.wait_for`. On timeout the child is killed and a synthetic
    failure is returned; captured streams are decoded with ``errors="replace"``
    and each is truncated to :data:`_MAX_IO_CAPTURE` bytes with a
    ``"... [truncated]"`` marker to cap memory and response size.

    The only side effect is the spawned subprocess; errors during spawn
    (``FileNotFoundError`` for a missing executable, or any other exception,
    which is logged via :data:`logger`) are converted into a ``-1`` return code
    rather than propagated. Called by :func:`linux_unshare_exec` and
    :func:`linux_nsenter_exec` to execute the assembled ``unshare``/``nsenter``
    command lines.

    Args:
        cmd: Full argument vector to execute; ``cmd[0]`` is the program path.
        timeout: Maximum wall-clock seconds to allow before killing the child.

    Returns:
        tuple[int, str, str]: ``(returncode, stdout, stderr)`` where both
        strings are stripped and truncated; ``returncode`` is ``-1`` on
        timeout, missing executable, or other spawn failure (and is also
        normalized to ``-1`` if the process reports ``None``).
    """
    try:
        proc = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
        )
        stdout_b, stderr_b = await asyncio.wait_for(
            proc.communicate(),
            timeout=timeout,
        )
    except asyncio.TimeoutError:
        try:
            proc.kill()
        except ProcessLookupError:
            pass
        return -1, "", f"Command timed out after {timeout}s."
    except FileNotFoundError as exc:
        return -1, "", f"Executable not found: {exc}"
    except Exception as exc:
        logger.error("_run_exec_limited %s: %s", cmd, exc, exc_info=True)
        return -1, "", str(exc)

    out = stdout_b.decode(errors="replace")
    err = stderr_b.decode(errors="replace")
    if len(out) > _MAX_IO_CAPTURE:
        out = out[:_MAX_IO_CAPTURE] + "\n... [truncated]"
    if len(err) > _MAX_IO_CAPTURE:
        err = err[:_MAX_IO_CAPTURE] + "\n... [truncated]"
    rc = proc.returncode
    if rc is None:
        rc = -1
    return rc, out.strip(), err.strip()


def _json_ok(**extra: Any) -> str:
    """Serialize a success envelope to a JSON string.

    Builds ``{"success": True, **extra}`` and dumps it via the module's
    ``jsonutil`` (imported as ``json``), giving every handler a uniform
    success shape on top of which it layers result fields (e.g. ``path``,
    ``content``, ``returncode``). Pure formatting; no I/O or side effects.
    Called on the success path of all seven tool handlers in this module.

    Args:
        **extra: Arbitrary result fields merged into the envelope alongside
            ``success``.

    Returns:
        str: The JSON-encoded success envelope.
    """
    payload: dict[str, Any] = {"success": True, **extra}
    return json.dumps(payload)


def _json_err(msg: str) -> str:
    """Serialize a failure envelope to a JSON string.

    Dumps ``{"success": False, "error": msg}`` via ``jsonutil`` so every
    handler reports errors in one consistent shape. Pure formatting; no I/O or
    side effects. Called throughout this module for missing context, path
    validation failures, and OS errors from the cgroup/subprocess operations.

    Args:
        msg: Human-readable error message placed under the ``error`` key.

    Returns:
        str: The JSON-encoded failure envelope.
    """
    return json.dumps({"success": False, "error": msg})


# ---------------------------------------------------------------------------
# Handlers
# ---------------------------------------------------------------------------


[docs] async def cgroup_read_file( path: str, max_bytes: int = _MAX_READ_BYTES_DEFAULT, ctx: Any = None, ) -> str: """Read a cgroup v2 pseudo-file under the unified hierarchy as text. Handler for the ``cgroup_read_file`` tool: it lets a privileged user inspect cgroup controller state (e.g. ``memory.current``, ``cgroup.controllers``) by reading the pseudo-file at ``path``. The read is bounded so a large or pathological file cannot blow up the response. Requires the ``UNSANDBOXED_EXEC`` privilege, enforced via :func:`_check_priv`. The ``path`` is confined to ``/sys/fs/cgroup`` by :func:`_resolve_under_cgroup`, then the bytes are read directly off the filesystem and decoded with ``errors="replace"``, truncating past ``max_bytes`` (capped at :data:`_MAX_READ_BYTES_CAP`). Results are wrapped with :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list (see ``tool_loader.py``); no direct in-repo callers. Args: path: Path to the pseudo-file, absolute or relative to the cgroup mount. max_bytes: Maximum number of bytes to return, clamped to the range ``[1, _MAX_READ_BYTES_CAP]``. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with ``path``, ``content`` and ``bytes_read``, or a JSON error envelope on a missing context, denied privilege, path-validation failure, or ``OSError``. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe max_bytes = max(1, min(int(max_bytes), _MAX_READ_BYTES_CAP)) resolved, err = _resolve_under_cgroup(path) if err: return _json_err(err) try: data = resolved.read_bytes() except OSError as e: return _json_err(f"Read failed: {e}") if len(data) > max_bytes: text = data[:max_bytes].decode(errors="replace") + "\n... [truncated]" else: text = data.decode(errors="replace") return _json_ok( path=str(resolved), content=text, bytes_read=min(len(data), max_bytes) )
[docs] async def cgroup_write_file( path: str, content: str, ctx: Any = None, ) -> str: """Write UTF-8 text into a cgroup v2 pseudo-file (a control operation). Handler for the ``cgroup_write_file`` tool: this is how the bot actually reconfigures a cgroup, since cgroup v2 is driven entirely by writes to interface files (e.g. enabling controllers via ``cgroup.subtree_control`` or setting a limit via ``memory.max``). The write goes straight to the kernel-backed pseudo-file, so its semantics depend on which file is targeted. Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`, and the ``path`` is confined under ``/sys/fs/cgroup`` by :func:`_resolve_under_cgroup` before :meth:`pathlib.Path.write_text` is called. Envelopes come from :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: path: Target pseudo-file path, absolute or relative to the cgroup mount. content: UTF-8 text to write verbatim to the file. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with ``path`` and ``bytes_written``, or a JSON error envelope on a missing context, denied privilege, path-validation failure, or ``OSError`` from the write. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe resolved, err = _resolve_under_cgroup(path) if err: return _json_err(err) try: resolved.write_text(content, encoding="utf-8") except OSError as e: return _json_err(f"Write failed: {e}") return _json_ok(path=str(resolved), bytes_written=len(content.encode("utf-8")))
[docs] async def cgroup_mkdir( path: str, parents: bool = True, ctx: Any = None, ) -> str: """Create a cgroup directory, optionally with its parents. Handler for the ``cgroup_mkdir`` tool. Creating a directory under the unified hierarchy is how a new child cgroup is materialized: the kernel auto-populates it with the relevant ``cgroup.*`` interface files, after which processes can be moved in and limits applied. With ``parents`` true this behaves like ``mkdir -p`` and is idempotent (``exist_ok=True``). Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`; the ``path`` is confined under ``/sys/fs/cgroup`` by :func:`_resolve_under_cgroup` before :meth:`pathlib.Path.mkdir`. Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: path: Directory to create, absolute or relative to the cgroup mount. parents: When true, create any missing parent directories as well. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with the created ``path``, or a JSON error envelope on a missing context, denied privilege, path-validation failure, or ``OSError`` from the mkdir. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe resolved, err = _resolve_under_cgroup(path) if err: return _json_err(err) try: resolved.mkdir(parents=parents, exist_ok=True) except OSError as e: return _json_err(f"mkdir failed: {e}") return _json_ok(path=str(resolved))
[docs] async def cgroup_rmdir( path: str, ctx: Any = None, ) -> str: """Remove an empty cgroup directory under the unified hierarchy. Handler for the ``cgroup_rmdir`` tool, the inverse of :func:`cgroup_mkdir`. The kernel only allows an empty cgroup to be removed, so this cleans up a child cgroup once every process has been migrated out of it. As an extra guard it refuses to delete the mount root itself. Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`; the ``path`` is confined under ``/sys/fs/cgroup`` by :func:`_resolve_under_cgroup` and compared against :func:`_cgroup_mount_resolved` before :meth:`pathlib.Path.rmdir`. Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: path: Empty directory to remove, absolute or relative to the cgroup mount. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with the removed ``path``, or a JSON error envelope on a missing context, denied privilege, path-validation failure, an attempt to remove the mount root, or an ``OSError`` (notably when the directory is non-empty). """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe resolved, err = _resolve_under_cgroup(path) if err: return _json_err(err) if resolved == _cgroup_mount_resolved(): return _json_err("Refusing to remove the cgroup mount root.") try: resolved.rmdir() except OSError as e: return _json_err(f"rmdir failed (directory must be empty): {e}") return _json_ok(path=str(resolved))
[docs] async def cgroup_move_pid( cgroup_path: str, pid: int, ctx: Any = None, ) -> str: """Move a process into a cgroup by writing its PID to ``cgroup.procs``. Handler for the ``cgroup_move_pid`` tool. In cgroup v2 a task is assigned to a cgroup by writing its PID into that cgroup's ``cgroup.procs`` file; this is the step that actually subjects a running process to the cgroup's controllers and limits. The target directory must already exist (created via :func:`cgroup_mkdir`). Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`. The ``cgroup_path`` is confined under ``/sys/fs/cgroup`` by :func:`_resolve_under_cgroup`, checked to be an existing directory, and the PID is written to ``cgroup.procs`` inside it. Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: cgroup_path: Existing target cgroup directory, absolute or relative to the cgroup mount. pid: Process ID to move into the cgroup. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with ``cgroup_path`` and ``pid``, or a JSON error envelope on a missing context, denied privilege, path-validation failure, a non-directory target, or an ``OSError`` from the write. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe resolved, err = _resolve_under_cgroup(cgroup_path) if err: return _json_err(err) if not resolved.is_dir(): return _json_err("cgroup_path must be an existing cgroup directory.") procs = resolved / "cgroup.procs" try: procs.write_text(f"{int(pid)}\n", encoding="utf-8") except OSError as e: return _json_err(f"Could not write PID to cgroup.procs: {e}") return _json_ok(cgroup_path=str(resolved), pid=int(pid))
def _sanitize_flag_list( flags: list[Any], name: str ) -> tuple[list[str] | None, str | None]: """Normalize a list of CLI flag tokens into properly-dashed options. Validates that ``flags`` is a list of non-empty strings, then canonicalizes each entry: tokens already starting with ``-`` are kept verbatim, single-character tokens become short options (``"m"`` -> ``"-m"``), and longer tokens become long options (``"mount"`` -> ``"--mount"``). This lets the tool accept user-friendly flag names while still emitting a valid ``unshare``/``nsenter`` argument vector. Pure validation/transformation; no I/O or side effects. Called by :func:`linux_unshare_exec` (for ``unshare_flags``) and :func:`linux_nsenter_exec` (for ``nsenter_flags``) before the flags are spliced into the command passed to :func:`_run_exec_limited`. Args: flags: The candidate flag tokens to sanitize. name: The parameter name (e.g. ``"unshare_flags"``) used to build a descriptive validation error message. Returns: tuple[list[str] | None, str | None]: ``(normalized_flags, None)`` on success, or ``(None, error_message)`` when ``flags`` is not a list or contains a non-string / empty entry. """ if not isinstance(flags, list): return None, f"{name} must be a list of strings." out: list[str] = [] for x in flags: if not isinstance(x, str) or not x: return None, f"{name} entries must be non-empty strings." if x.startswith("-"): out.append(x) elif len(x) == 1: out.append(f"-{x}") else: out.append(f"--{x}") return out, None
[docs] async def linux_unshare_exec( command: list[str], unshare_flags: list[str] | None = None, timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT, ctx: Any = None, ) -> str: """Run a command in fresh Linux namespaces via ``unshare`` (util-linux). Handler for the ``linux_unshare_exec`` tool. It assembles and runs ``unshare [unshare_flags] -- command ...``, letting a privileged user execute a process inside newly unshared namespaces (mount, PID, net, user, etc.) for isolation or experimentation. The flag tokens are accepted in a forgiving form and normalized to real options. Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`, validates that ``command`` is a non-empty list, normalizes ``unshare_flags`` through :func:`_sanitize_flag_list`, resolves the ``unshare`` binary with :func:`shutil.which`, clamps ``timeout`` to :data:`_MAX_SUBPROCESS_TIMEOUT`, and executes the assembled argv via :func:`_run_exec_limited` (no shell). Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: command: Executable and arguments to run after ``--``; must be a non-empty list of strings. unshare_flags: Optional namespace flags, accepted as bare names or short/long options and normalized before use. timeout: Wall-clock seconds before the child is killed, clamped to ``[1, _MAX_SUBPROCESS_TIMEOUT]``. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with ``returncode``, ``stdout``, ``stderr`` and the full ``command``, or a JSON error envelope on a missing context, denied privilege, invalid arguments, or a missing ``unshare`` executable. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe if not command or not isinstance(command, list): return _json_err("command must be a non-empty list of strings.") cmd_parts = [str(x) for x in command] if not cmd_parts: return _json_err("command must not be empty.") flags = unshare_flags if unshare_flags is not None else [] flist, ferr = _sanitize_flag_list(flags, "unshare_flags") if ferr: return _json_err(ferr) unshare_bin = shutil.which("unshare") if not unshare_bin: return _json_err("unshare executable not found on PATH (install util-linux).") timeout = max(1, min(int(timeout), _MAX_SUBPROCESS_TIMEOUT)) full_cmd = [unshare_bin, *flist, "--", *cmd_parts] rc, out, err = await _run_exec_limited(full_cmd, timeout=timeout) return _json_ok(returncode=rc, stdout=out, stderr=err, command=full_cmd)
[docs] async def linux_nsenter_exec( target_pid: int, command: list[str], nsenter_flags: list[str] | None = None, timeout: int = _DEFAULT_SUBPROCESS_TIMEOUT, ctx: Any = None, ) -> str: """Run a command inside another process's namespaces via ``nsenter``. Handler for the ``linux_nsenter_exec`` tool. It assembles and runs ``nsenter -t <pid> [nsenter_flags] -- command ...`` (util-linux), so a privileged user can join an existing task's namespaces -- for example to debug inside a container by entering its mount and network namespaces. This is the counterpart to :func:`linux_unshare_exec`, which instead creates new namespaces. Requires the ``UNSANDBOXED_EXEC`` privilege via :func:`_check_priv`, validates ``target_pid`` is positive and ``command`` is a non-empty list, normalizes ``nsenter_flags`` through :func:`_sanitize_flag_list`, resolves the ``nsenter`` binary with :func:`shutil.which`, clamps ``timeout`` to :data:`_MAX_SUBPROCESS_TIMEOUT`, and executes the assembled argv via :func:`_run_exec_limited` (no shell). Results use :func:`_json_ok` / :func:`_json_err`. Invoked through the tool dispatcher via the ``handler`` entry in this module's ``TOOLS`` list; no direct in-repo callers. Args: target_pid: PID of the process whose namespaces to enter; must be a positive integer. command: Executable and arguments to run after ``--``; must be a non-empty list of strings. nsenter_flags: Optional namespace flags, accepted as bare names or short/long options and normalized before use. timeout: Wall-clock seconds before the child is killed, clamped to ``[1, _MAX_SUBPROCESS_TIMEOUT]``. ctx: The :class:`ToolContext`; required for the privilege check. Returns: str: A JSON success envelope with ``returncode``, ``stdout``, ``stderr`` and the full ``command``, or a JSON error envelope on a missing context, denied privilege, invalid arguments, or a missing ``nsenter`` executable. """ if ctx is None: return _json_err("No context.") pe = await _check_priv(ctx) if pe: return pe pid = int(target_pid) if pid < 1: return _json_err("target_pid must be a positive integer.") if not command or not isinstance(command, list): return _json_err("command must be a non-empty list of strings.") cmd_parts = [str(x) for x in command] if not cmd_parts: return _json_err("command must not be empty.") flags = nsenter_flags if nsenter_flags is not None else [] flist, ferr = _sanitize_flag_list(flags, "nsenter_flags") if ferr: return _json_err(ferr) nsenter_bin = shutil.which("nsenter") if not nsenter_bin: return _json_err("nsenter executable not found on PATH (install util-linux).") timeout = max(1, min(int(timeout), _MAX_SUBPROCESS_TIMEOUT)) full_cmd = [nsenter_bin, "-t", str(pid), *flist, "--", *cmd_parts] rc, out, err = await _run_exec_limited(full_cmd, timeout=timeout) return _json_ok(returncode=rc, stdout=out, stderr=err, command=full_cmd)
# --------------------------------------------------------------------------- # TOOLS # --------------------------------------------------------------------------- TOOLS = [ { "name": "cgroup_read_file", "description": ( "Read a cgroup v2 pseudo-file under /sys/fs/cgroup. " "Path may be absolute or relative to the cgroup mount. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Path to the file under the cgroup hierarchy.", }, "max_bytes": { "type": "integer", "description": f"Max bytes to read (default {_MAX_READ_BYTES_DEFAULT}, max {_MAX_READ_BYTES_CAP}).", }, }, "required": ["path"], }, "handler": cgroup_read_file, }, { "name": "cgroup_write_file", "description": ( "Write text to a cgroup pseudo-file (e.g. cgroup.subtree_control, memory.max). " "Path must resolve under /sys/fs/cgroup. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "path": {"type": "string", "description": "Target file path."}, "content": {"type": "string", "description": "UTF-8 text to write."}, }, "required": ["path", "content"], }, "handler": cgroup_write_file, }, { "name": "cgroup_mkdir", "description": ( "Create a cgroup subdirectory (equivalent to mkdir -p when parents is true). " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Cgroup directory to create.", }, "parents": { "type": "boolean", "description": "If true, create parent directories as needed (default true).", }, }, "required": ["path"], }, "handler": cgroup_mkdir, }, { "name": "cgroup_rmdir", "description": ( "Remove an empty cgroup directory under /sys/fs/cgroup. " "Cannot remove the mount root. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "path": { "type": "string", "description": "Empty cgroup directory to remove.", }, }, "required": ["path"], }, "handler": cgroup_rmdir, }, { "name": "cgroup_move_pid", "description": ( "Move a process into a cgroup by appending its PID to cgroup.procs. " "cgroup_path is the leaf cgroup directory. Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "cgroup_path": { "type": "string", "description": "Path to the target cgroup directory.", }, "pid": {"type": "integer", "description": "Process ID to move."}, }, "required": ["cgroup_path", "pid"], }, "handler": cgroup_move_pid, }, { "name": "linux_unshare_exec", "description": ( "Run a command in new namespace(s) using unshare(1) from util-linux: " "unshare [unshare_flags] -- command. Pass flags as short or long options " "(e.g. --mount, --pid, --fork). Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "command": { "type": "array", "items": {"type": "string"}, "description": "Executable and arguments after '--'.", }, "unshare_flags": { "type": "array", "items": {"type": "string"}, "description": "Options for unshare (e.g. m, u, i, n, p, U or long names).", }, "timeout": { "type": "integer", "description": f"Seconds (default {_DEFAULT_SUBPROCESS_TIMEOUT}, max {_MAX_SUBPROCESS_TIMEOUT}).", }, }, "required": ["command"], }, "handler": linux_unshare_exec, }, { "name": "linux_nsenter_exec", "description": ( "Run a command in another task's namespaces using nsenter(1): " "nsenter -t pid [nsenter_flags] -- command. " "Requires UNSANDBOXED_EXEC." ), "parameters": { "type": "object", "properties": { "target_pid": {"type": "integer", "description": "Target process ID."}, "command": { "type": "array", "items": {"type": "string"}, "description": "Executable and arguments after '--'.", }, "nsenter_flags": { "type": "array", "items": {"type": "string"}, "description": "Namespace options (e.g. --mount, --net, --all).", }, "timeout": { "type": "integer", "description": f"Seconds (default {_DEFAULT_SUBPROCESS_TIMEOUT}, max {_MAX_SUBPROCESS_TIMEOUT}).", }, }, "required": ["target_pid", "command"], }, "handler": linux_nsenter_exec, }, ]