"""Shared OS-level operation primitives for admin / cluster control commands.
This module is the canonical home for the hardened subprocess, git-environment,
cooldown and systemd-restart helpers used by both:
- :mod:`message_processor.admin_ops_commands` (the legacy single-service
``!bot_restart`` / ``!proxy_restart`` / ``!bot_pull`` worker path), and
- :mod:`core.control_ops` (the fleet-wide control-ops daemon + publisher).
It is **stdlib-only** on purpose: ``message_processor.admin_ops_commands`` is
loaded in isolation by ``tests/test_admin_ops_commands.py`` (by file path, with
the heavy app stack stubbed), and it imports the primitives below — so this
module must never pull in redis / discord / config at import time. The Redis
client used for the git-pull dedupe lock is always passed in as an argument.
Security hardening carried over from admin_ops_commands (2026-04-28):
- ``asyncio.create_subprocess_exec`` (no ``shell=True``) — no injection.
- Per-command cooldown to prevent rapid-repeat DoS.
- Subprocess output capped at read time.
- git pull kills the subprocess on timeout (no orphaned index.lock).
- git pull uses an environment allowlist (no secret leakage to git hooks).
"""
from __future__ import annotations
import asyncio
import logging
import os
import socket
import time
from typing import NamedTuple, Optional
logger = logging.getLogger("ops_exec")
# ──────────────────────────────────────────────
# Subprocess helper
# ──────────────────────────────────────────────
# Maximum bytes to read from subprocess stdout/stderr (caps memory).
_MAX_PIPE_BYTES: int = 16_384 # 16 KiB
async def _run_subprocess(*args: str, timeout: float = 30.0) -> tuple[int, str, str]:
"""Run a subprocess and return ``(returncode, stdout, stderr)``.
Args are passed directly — no shell expansion. Raises
:class:`asyncio.TimeoutError` if *timeout* is exceeded (killing the child).
"""
proc = await asyncio.create_subprocess_exec(
*args,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
raise
rc = proc.returncode if proc.returncode is not None else -1
return (
rc,
stdout_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip(),
stderr_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip(),
)
# ──────────────────────────────────────────────
# Environment allowlist for git subprocess
# ──────────────────────────────────────────────
# Only these environment variables are passed to ``git pull``. This prevents
# secret leakage (API keys, tokens) to git hooks that might run during the pull.
_GIT_SAFE_ENV_KEYS: frozenset[str] = frozenset(
{
"HOME",
"PATH",
"USER",
"LANG",
"LC_ALL",
"LC_CTYPE",
"LOGNAME",
"SHELL",
"TERM",
"SSH_AUTH_SOCK",
"SSH_AGENT_PID", # needed for SSH key-based pulls
"GIT_SSH",
"GIT_SSH_COMMAND", # custom SSH config
"TMPDIR",
"TMP",
"TEMP",
"http_proxy",
"https_proxy",
"no_proxy",
"HTTP_PROXY",
"HTTPS_PROXY",
"NO_PROXY",
}
)
def _build_git_env() -> dict[str, str]:
"""Build a sanitised environment dict for git subprocesses."""
env = {k: v for k, v in os.environ.items() if k in _GIT_SAFE_ENV_KEYS}
env["GIT_MERGE_AUTOEDIT"] = "no"
env["GIT_TERMINAL_PROMPT"] = "0"
return env
# ──────────────────────────────────────────────
# Per-command cooldown
# ──────────────────────────────────────────────
_COMMAND_COOLDOWN_SECONDS: float = 10.0
_last_invocation: dict[str, float] = {}
def _check_cooldown(cmd: str) -> float:
"""Return remaining cooldown seconds, or 0.0 if *cmd* is ready."""
last = _last_invocation.get(cmd, 0.0)
remaining = _COMMAND_COOLDOWN_SECONDS - (time.monotonic() - last)
return max(0.0, remaining)
def _record_invocation(cmd: str) -> None:
"""Mark *cmd* as just invoked (starts the cooldown window)."""
_last_invocation[cmd] = time.monotonic()
# ──────────────────────────────────────────────
# Deferred systemd restart
# ──────────────────────────────────────────────
async def _deferred_systemctl_restart(service: str, delay: float = 2.0) -> None:
"""Sleep *delay* seconds then issue ``systemctl restart <service>``.
Used for self-restart so the caller can flush an ACK / reply first (the
restart kills this very process). Fire-and-forget; errors are logged.
"""
await asyncio.sleep(delay)
logger.info("[ops_exec] Issuing systemctl restart %r (after %.1fs)", service, delay)
try:
rc, out, err = await _run_subprocess("systemctl", "restart", service, timeout=15.0)
if rc != 0:
logger.error(
"[ops_exec] systemctl restart %r exited %d: %s", service, rc, err or out
)
except Exception:
logger.exception("[ops_exec] Failed to restart service %r", service)
[docs]
async def systemctl_restart(service: str, timeout: float = 20.0) -> tuple[bool, str]:
"""Synchronously ``systemctl restart <service>`` (for restarting OTHER units).
Returns ``(ok, summary)``. Use this for the proxy / non-self units; use
:func:`_deferred_systemctl_restart` for restarting the current process.
"""
try:
rc, out, err = await _run_subprocess("systemctl", "restart", service, timeout=timeout)
except asyncio.TimeoutError:
return False, f"`systemctl restart {service}` timed out ({timeout:.0f}s)"
except Exception as e: # pragma: no cover - defensive
return False, f"failed to restart `{service}`: {e}"
if rc == 0:
return True, f"`{service}` restarted"
combined = (err or out or "(no output)").splitlines()
return False, f"`systemctl restart {service}` exited {rc}: " + " ".join(combined[:4])
# ──────────────────────────────────────────────
# git pull (fleet-wide, with per-host/repo dedupe)
# ──────────────────────────────────────────────
[docs]
class PullResult(NamedTuple):
"""Outcome of :func:`run_git_pull`."""
ran: bool # True if this caller actually executed the pull
ok: bool # True if the pull succeeded (or was skipped cleanly)
summary: str # short human-readable status
def _resolve_repo_dir(repo_dir: Optional[str]) -> str:
return os.path.abspath(repo_dir or os.getcwd())
[docs]
async def run_git_pull(
repo_dir: Optional[str],
*,
redis=None,
dedupe: bool = True,
lock_ttl: int = 90,
instance_id: str = "",
timeout: float = 60.0,
) -> PullResult:
"""Run ``git pull`` in *repo_dir* with optional per-host/repo dedupe.
When *dedupe* is True and *redis* is provided, contend on a Redis SETNX lock
keyed by ``sg:control:pull:{hostname}:{abs_repo_path}`` so that multiple
co-located services sharing one checkout only pull once. The lock self-
partitions by host, so on a multi-host fleet each host pulls exactly once.
Returns a :class:`PullResult`. ``ran=False, ok=True`` means another local
service is handling the pull (clean dedup skip).
"""
abs_repo = _resolve_repo_dir(repo_dir)
display = os.path.basename(abs_repo) or abs_repo
lock_key = None
if dedupe and redis is not None:
host = socket.gethostname()
lock_key = f"sg:control:pull:{host}:{abs_repo}"
try:
got = await redis.set(lock_key, instance_id or "1", nx=True, ex=lock_ttl)
except Exception:
logger.exception("[ops_exec] pull dedupe lock failed; proceeding without it")
got = True
lock_key = None
if not got:
return PullResult(ran=False, ok=True, summary=f"skipped pull (`{display}` already pulling)")
env = _build_git_env()
proc: Optional[asyncio.subprocess.Process] = None
try:
proc = await asyncio.create_subprocess_exec(
"git", "pull",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
cwd=abs_repo,
env=env,
)
stdout_b, stderr_b = await asyncio.wait_for(proc.communicate(), timeout=timeout)
except asyncio.TimeoutError:
if proc is not None:
proc.kill()
try:
await proc.communicate()
except Exception:
pass
return PullResult(ran=True, ok=False, summary=f"`git pull` timed out in `{display}`")
except Exception as e:
if proc is not None:
try:
proc.kill()
await proc.communicate()
except Exception:
pass
return PullResult(ran=True, ok=False, summary=f"`git pull` failed in `{display}`: {e}")
finally:
if lock_key is not None:
try:
await redis.delete(lock_key)
except Exception:
pass
rc = proc.returncode if proc.returncode is not None else -1
out = stdout_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip()
err = stderr_b[:_MAX_PIPE_BYTES].decode(errors="replace").strip()
combined = " ".join(filter(None, [out, err]))[:300] or "(no output)"
if rc == 0:
return PullResult(ran=True, ok=True, summary=f"pulled `{display}`: {combined}")
return PullResult(ran=True, ok=False, summary=f"`git pull` exited {rc} in `{display}`: {combined}")