"""Gather OS-level server statistics for system prompt injection.
All heavy reads (``/proc``, ``psutil``) are wrapped in
:func:`asyncio.to_thread` so the event loop is never blocked.
Results are cached for 30 seconds to avoid redundant ``psutil``
reads on every inference.
"""
from __future__ import annotations
import asyncio
import logging
import platform
import time
from typing import Any
logger = logging.getLogger(__name__)
# Module-level TTL cache (30s) so psutil reads aren't repeated per-message.
_cache: dict[str, Any] | None = None
_cache_ts: float = 0.0
_CACHE_TTL = 30.0
def _humanize_bytes(b: int) -> str:
"""Format bytes as a human-readable ``X.X GB`` string."""
gb = b / (1024 ** 3)
if gb >= 1.0:
return f"{gb:.1f} GB"
mb = b / (1024 ** 2)
return f"{mb:.0f} MB"
def _gather_sync() -> dict[str, Any]:
"""Blocking helper that collects all stats in one shot."""
import psutil
stats: dict[str, Any] = {}
# ── CPU ──
try:
with open("/proc/cpuinfo", encoding="utf-8") as f:
for line in f:
if line.startswith("model name"):
stats["cpu_model"] = line.split(":", 1)[1].strip()
break
if "cpu_model" not in stats:
# ARM / aarch64 doesn't have "model name" in cpuinfo;
# fall back to lscpu which always has it.
import subprocess
out = subprocess.check_output(
["lscpu"], text=True, timeout=2,
)
for line in out.splitlines():
if line.startswith("Model name:"):
stats["cpu_model"] = line.split(":", 1)[1].strip()
break
if "cpu_model" not in stats:
stats["cpu_model"] = platform.processor() or platform.machine()
except Exception:
stats["cpu_model"] = platform.processor() or platform.machine()
stats["cpu_cores"] = psutil.cpu_count(logical=True)
# Non-blocking instant read (uses cached delta from previous call)
stats["cpu_usage"] = f"{psutil.cpu_percent(interval=None):.1f}%"
# ── OS ──
try:
release = platform.freedesktop_os_release()
stats["os"] = f"{release.get('PRETTY_NAME', release.get('NAME', 'Linux'))}"
except Exception:
stats["os"] = platform.platform()
# ── RAM ──
mem = psutil.virtual_memory()
stats["ram"] = (
f"{_humanize_bytes(mem.used)} / {_humanize_bytes(mem.total)}"
f" ({mem.percent:.0f}%)"
)
# ── Disk ──
disk = psutil.disk_usage("/")
stats["disk"] = (
f"{_humanize_bytes(disk.used)} / {_humanize_bytes(disk.total)}"
f" ({disk.percent:.0f}%)"
)
# ── Processes ──
stats["running_processes"] = len(psutil.pids())
# ── Logged-in users (utmp) ──
try:
users = psutil.users()
stats["logged_in_users"] = len({u.name for u in users})
except Exception:
stats["logged_in_users"] = 0
return stats
[docs]
async def get_server_stats(
background_task_count: int = 0,
) -> dict[str, Any]:
"""Return a dict of live server statistics.
Results are cached for 30 seconds to avoid redundant psutil reads.
Parameters
----------
background_task_count:
Number of currently active background tool tasks, provided
by the caller (typically from :class:`TaskManager`).
"""
global _cache, _cache_ts
now = time.monotonic()
if _cache is not None and (now - _cache_ts) < _CACHE_TTL:
stats = dict(_cache) # shallow copy so caller can mutate
stats["background_tasks"] = background_task_count
return stats
try:
stats = await asyncio.to_thread(_gather_sync)
_cache = dict(stats)
_cache_ts = now
stats["background_tasks"] = background_task_count
return stats
except Exception:
logger.debug("Failed to gather server stats", exc_info=True)
return {}