Source code for server_stats

"""Gather OS-level server statistics for system prompt injection.

All heavy reads (``/proc``, ``psutil``) are wrapped in
:func:`asyncio.to_thread` so the event loop is never blocked.

Results are cached for 30 seconds to avoid redundant ``psutil``
reads on every inference.
"""

from __future__ import annotations

import asyncio
import logging
import platform
import time
from typing import Any

logger = logging.getLogger(__name__)

# Module-level TTL cache (30s) so psutil reads aren't repeated per-message.
_cache: dict[str, Any] | None = None
_cache_ts: float = 0.0
_CACHE_TTL = 30.0


def _humanize_bytes(b: int) -> str:
    """Format bytes as a human-readable ``X.X GB`` string."""
    gb = b / (1024 ** 3)
    if gb >= 1.0:
        return f"{gb:.1f} GB"
    mb = b / (1024 ** 2)
    return f"{mb:.0f} MB"


def _gather_sync() -> dict[str, Any]:
    """Blocking helper that collects all stats in one shot."""
    import psutil

    stats: dict[str, Any] = {}

    # ── CPU ──
    try:
        with open("/proc/cpuinfo", encoding="utf-8") as f:
            for line in f:
                if line.startswith("model name"):
                    stats["cpu_model"] = line.split(":", 1)[1].strip()
                    break
        if "cpu_model" not in stats:
            # ARM / aarch64 doesn't have "model name" in cpuinfo;
            # fall back to lscpu which always has it.
            import subprocess
            out = subprocess.check_output(
                ["lscpu"], text=True, timeout=2,
            )
            for line in out.splitlines():
                if line.startswith("Model name:"):
                    stats["cpu_model"] = line.split(":", 1)[1].strip()
                    break
        if "cpu_model" not in stats:
            stats["cpu_model"] = platform.processor() or platform.machine()
    except Exception:
        stats["cpu_model"] = platform.processor() or platform.machine()

    stats["cpu_cores"] = psutil.cpu_count(logical=True)

    # Non-blocking instant read (uses cached delta from previous call)
    stats["cpu_usage"] = f"{psutil.cpu_percent(interval=None):.1f}%"

    # ── OS ──
    try:
        release = platform.freedesktop_os_release()
        stats["os"] = f"{release.get('PRETTY_NAME', release.get('NAME', 'Linux'))}"
    except Exception:
        stats["os"] = platform.platform()

    # ── RAM ──
    mem = psutil.virtual_memory()
    stats["ram"] = (
        f"{_humanize_bytes(mem.used)} / {_humanize_bytes(mem.total)}"
        f" ({mem.percent:.0f}%)"
    )

    # ── Disk ──
    disk = psutil.disk_usage("/")
    stats["disk"] = (
        f"{_humanize_bytes(disk.used)} / {_humanize_bytes(disk.total)}"
        f" ({disk.percent:.0f}%)"
    )

    # ── Processes ──
    stats["running_processes"] = len(psutil.pids())

    # ── Logged-in users (utmp) ──
    try:
        users = psutil.users()
        stats["logged_in_users"] = len({u.name for u in users})
    except Exception:
        stats["logged_in_users"] = 0

    return stats


[docs] async def get_server_stats( background_task_count: int = 0, ) -> dict[str, Any]: """Return a dict of live server statistics. Results are cached for 30 seconds to avoid redundant psutil reads. Parameters ---------- background_task_count: Number of currently active background tool tasks, provided by the caller (typically from :class:`TaskManager`). """ global _cache, _cache_ts now = time.monotonic() if _cache is not None and (now - _cache_ts) < _CACHE_TTL: stats = dict(_cache) # shallow copy so caller can mutate stats["background_tasks"] = background_task_count return stats try: stats = await asyncio.to_thread(_gather_sync) _cache = dict(stats) _cache_ts = now stats["background_tasks"] = background_task_count return stats except Exception: logger.debug("Failed to gather server stats", exc_info=True) return {}