Source code for core.ops_exec

"""Shared OS-level operation primitives for admin / cluster control commands.

This module is the canonical home for the hardened subprocess, git-environment,
cooldown and systemd-restart helpers used by both:

- :mod:`message_processor.admin_ops_commands` (the legacy single-service
  ``!bot_restart`` / ``!proxy_restart`` / ``!bot_pull`` worker path), and
- :mod:`core.control_ops` (the fleet-wide control-ops daemon + publisher).

It is **stdlib-only** on purpose: ``message_processor.admin_ops_commands`` is
loaded in isolation by ``tests/test_admin_ops_commands.py`` (by file path, with
the heavy app stack stubbed), and it imports the primitives below — so this
module must never pull in redis / discord / config at import time. The Redis
client used for the git-pull dedupe lock is always passed in as an argument.

Security hardening carried over from admin_ops_commands (2026-04-28):
    - ``asyncio.create_subprocess_exec`` (no ``shell=True``) — no injection.
    - Per-command cooldown to prevent rapid-repeat DoS.
    - Subprocess output capped at read time.
    - git pull kills the subprocess on timeout (no orphaned index.lock).
    - git pull uses an environment allowlist (no secret leakage to git hooks).
"""

from __future__ import annotations

import asyncio
import logging
import os
import socket
import time
from typing import NamedTuple, Optional

logger = logging.getLogger("ops_exec")

# ──────────────────────────────────────────────
# Subprocess helper
# ──────────────────────────────────────────────

# Maximum bytes to read from subprocess stdout/stderr (caps memory).
_MAX_PIPE_BYTES: int = 16_384  # 16 KiB


async def _run_subprocess(*args: str, timeout: float = 30.0) -> tuple[int, str, str]:
    """Run a subprocess and return ``(returncode, stdout, stderr)``.

    Args are passed directly — no shell expansion. Raises
    :class:`asyncio.TimeoutError` if *timeout* is exceeded (killing the child).
    """
    proc = await asyncio.create_subprocess_exec(
        *args,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    try:
        stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
    except asyncio.TimeoutError:
        proc.kill()
        await proc.communicate()
        raise
    rc = proc.returncode if proc.returncode is not None else -1
    return (
        rc,
        stdout_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip(),
        stderr_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip(),
    )


# ──────────────────────────────────────────────
# Environment allowlist for git subprocess
# ──────────────────────────────────────────────

# Only these environment variables are passed to ``git pull``. This prevents
# secret leakage (API keys, tokens) to git hooks that might run during the pull.
_GIT_SAFE_ENV_KEYS: frozenset[str] = frozenset(
    {
        "HOME",
        "PATH",
        "USER",
        "LANG",
        "LC_ALL",
        "LC_CTYPE",
        "LOGNAME",
        "SHELL",
        "TERM",
        "SSH_AUTH_SOCK",
        "SSH_AGENT_PID",  # needed for SSH key-based pulls
        "GIT_SSH",
        "GIT_SSH_COMMAND",  # custom SSH config
        "TMPDIR",
        "TMP",
        "TEMP",
        "http_proxy",
        "https_proxy",
        "no_proxy",
        "HTTP_PROXY",
        "HTTPS_PROXY",
        "NO_PROXY",
    }
)


def _build_git_env() -> dict[str, str]:
    """Build a sanitised environment dict for git subprocesses."""
    env = {k: v for k, v in os.environ.items() if k in _GIT_SAFE_ENV_KEYS}
    env["GIT_MERGE_AUTOEDIT"] = "no"
    env["GIT_TERMINAL_PROMPT"] = "0"
    return env


# ──────────────────────────────────────────────
# Per-command cooldown
# ──────────────────────────────────────────────

_COMMAND_COOLDOWN_SECONDS: float = 10.0
_last_invocation: dict[str, float] = {}


def _check_cooldown(cmd: str) -> float:
    """Return remaining cooldown seconds, or 0.0 if *cmd* is ready."""
    last = _last_invocation.get(cmd, 0.0)
    remaining = _COMMAND_COOLDOWN_SECONDS - (time.monotonic() - last)
    return max(0.0, remaining)


def _record_invocation(cmd: str) -> None:
    """Mark *cmd* as just invoked (starts the cooldown window)."""
    _last_invocation[cmd] = time.monotonic()


# ──────────────────────────────────────────────
# Deferred systemd restart
# ──────────────────────────────────────────────


async def _deferred_systemctl_restart(service: str, delay: float = 2.0) -> None:
    """Sleep *delay* seconds then issue ``systemctl restart <service>``.

    Used for self-restart so the caller can flush an ACK / reply first (the
    restart kills this very process). Fire-and-forget; errors are logged.
    """
    await asyncio.sleep(delay)
    logger.info("[ops_exec] Issuing systemctl restart %r (after %.1fs)", service, delay)
    try:
        rc, out, err = await _run_subprocess("systemctl", "restart", service, timeout=15.0)
        if rc != 0:
            logger.error(
                "[ops_exec] systemctl restart %r exited %d: %s", service, rc, err or out
            )
    except Exception:
        logger.exception("[ops_exec] Failed to restart service %r", service)


[docs] async def systemctl_restart(service: str, timeout: float = 20.0) -> tuple[bool, str]: """Synchronously ``systemctl restart <service>`` (for restarting OTHER units). Returns ``(ok, summary)``. Use this for the proxy / non-self units; use :func:`_deferred_systemctl_restart` for restarting the current process. """ try: rc, out, err = await _run_subprocess("systemctl", "restart", service, timeout=timeout) except asyncio.TimeoutError: return False, f"`systemctl restart {service}` timed out ({timeout:.0f}s)" except Exception as e: # pragma: no cover - defensive return False, f"failed to restart `{service}`: {e}" if rc == 0: return True, f"`{service}` restarted" combined = (err or out or "(no output)").splitlines() return False, f"`systemctl restart {service}` exited {rc}: " + " ".join(combined[:4])
# ────────────────────────────────────────────── # git pull (fleet-wide, with per-host/repo dedupe) # ──────────────────────────────────────────────
[docs] class PullResult(NamedTuple): """Outcome of :func:`run_git_pull`.""" ran: bool # True if this caller actually executed the pull ok: bool # True if the pull succeeded (or was skipped cleanly) summary: str # short human-readable status
def _resolve_repo_dir(repo_dir: Optional[str]) -> str: return os.path.abspath(repo_dir or os.getcwd())
[docs] async def run_git_pull( repo_dir: Optional[str], *, redis=None, dedupe: bool = True, lock_ttl: int = 90, instance_id: str = "", timeout: float = 60.0, ) -> PullResult: """Run ``git pull`` in *repo_dir* with optional per-host/repo dedupe. When *dedupe* is True and *redis* is provided, contend on a Redis SETNX lock keyed by ``sg:control:pull:{hostname}:{abs_repo_path}`` so that multiple co-located services sharing one checkout only pull once. The lock self- partitions by host, so on a multi-host fleet each host pulls exactly once. Returns a :class:`PullResult`. ``ran=False, ok=True`` means another local service is handling the pull (clean dedup skip). """ abs_repo = _resolve_repo_dir(repo_dir) display = os.path.basename(abs_repo) or abs_repo lock_key = None if dedupe and redis is not None: host = socket.gethostname() lock_key = f"sg:control:pull:{host}:{abs_repo}" try: got = await redis.set(lock_key, instance_id or "1", nx=True, ex=lock_ttl) except Exception: logger.exception("[ops_exec] pull dedupe lock failed; proceeding without it") got = True lock_key = None if not got: return PullResult(ran=False, ok=True, summary=f"skipped pull (`{display}` already pulling)") env = _build_git_env() proc: Optional[asyncio.subprocess.Process] = None try: proc = await asyncio.create_subprocess_exec( "git", "pull", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=abs_repo, env=env, ) stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout) except asyncio.TimeoutError: if proc is not None: proc.kill() try: await proc.communicate() except Exception: pass return PullResult(ran=True, ok=False, summary=f"`git pull` timed out in `{display}`") except Exception as e: if proc is not None: try: proc.kill() await proc.communicate() except Exception: pass return PullResult(ran=True, ok=False, summary=f"`git pull` failed in `{display}`: {e}") finally: if lock_key is not None: try: await redis.delete(lock_key) except Exception: pass rc = proc.returncode if proc.returncode is not None else -1 out = stdout_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip() err = stderr_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip() combined = " ".join(filter(None, [out, err]))[:300] or "(no output)" if rc == 0: return PullResult(ran=True, ok=True, summary=f"pulled `{display}`: {combined}") return PullResult(ran=True, ok=False, summary=f"`git pull` exited {rc} in `{display}`: {combined}")