Source code for media_cache

"""Disk-backed LRU media cache.

Caches downloaded media (images, audio, video, files) so that the same
URL is never fetched twice.  An in-memory index provides fast lookups
while a configurable disk directory persists data across restarts.

Each cached entry is stored on disk as two files::

    {sha256_of_url}.dat   – raw media bytes
    {sha256_of_url}.json  – sidecar metadata (mimetype, filename, url, ts, size)

On startup the disk directory is scanned to rebuild the in-memory index
*without* loading all bytes into RAM.
"""

from __future__ import annotations

import asyncio
import hashlib
import jsonutil as json
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable

import aiofiles

logger = logging.getLogger(__name__)

# Type alias for an async callable that returns (bytes, mimetype, filename)
Downloader = Callable[[], Awaitable[tuple[bytes, str, str]]]


# ------------------------------------------------------------------
# Internal data types
# ------------------------------------------------------------------


@dataclass
class _CacheEntry:
    """Lightweight index record for one cached media item.

    Holds just the metadata needed to locate and describe a cached
    download (``url``, ``mimetype``, ``filename``, ``size``, ``disk_key``,
    ``last_access``) so the in-memory index can stay small. The raw
    ``data`` bytes are an optional soft cache: they are populated on
    :meth:`MediaCache.put` and on a cache-hit reload, but left ``None``
    after a disk-only startup scan or once shed by the memory-budget
    logic, in which case the bytes are re-read from disk on next access.

    Instantiated only inside :class:`MediaCache` (in :meth:`MediaCache.put`
    and :meth:`MediaCache._scan_disk`); not used elsewhere in the repo.
    """

    url: str
    mimetype: str
    filename: str
    size: int  # byte length of the media data
    disk_key: str  # sha256 hex used for filenames on disk
    last_access: float  # epoch seconds

    # When populated, holds the raw media bytes (soft in-memory cache).
    # May be ``None`` if the entry was loaded from a disk-only scan.
    data: bytes | None = None


# ------------------------------------------------------------------
# MediaCache
# ------------------------------------------------------------------


[docs] class MediaCache: """Two-tier (memory + disk) LRU media cache. Parameters ---------- cache_dir: Directory for persistent storage. Created automatically. max_size_mb: Approximate cap on total disk usage in megabytes. Oldest entries are evicted when the limit is exceeded. max_memory_items: Maximum number of entries whose *bytes* are kept in RAM. Entries beyond this limit are still indexed (metadata only) and will be read back from disk on the next access. """
[docs] def __init__( self, cache_dir: str | Path = "media_cache", max_size_mb: int = 500, max_memory_items: int = 64, ) -> None: """Set up an empty cache rooted at *cache_dir* and create the directory. Configures the disk-byte budget and in-memory item cap, allocates the LRU index (an ``OrderedDict`` keyed by URL), the hit/miss counters, and the ``asyncio.Lock`` that serializes index mutations. The actual disk scan is deferred to :meth:`ensure_loaded` so that constructing a ``MediaCache`` never blocks the event loop; only the ``mkdir`` (touching the filesystem) happens here. Constructed by the gateway and web services from configured values — see ``gateway_main.py`` and ``web_main.py``, both of which pass ``cfg.media_cache_dir`` and ``cfg.media_cache_max_mb``. Args: cache_dir (str | Path): Directory used for persistent storage of the ``.dat``/``.json`` files; created automatically. max_size_mb (int): Approximate cap on total on-disk usage in megabytes; oldest entries are evicted once exceeded. max_memory_items (int): Maximum number of entries whose raw bytes are kept resident in RAM. Excess entries stay indexed by metadata only and reload from disk on next access. """ self._dir = Path(cache_dir) self._max_bytes = max_size_mb * 1024 * 1024 self._max_memory = max_memory_items # Ordered by last access (most-recent at the end). self._index: OrderedDict[str, _CacheEntry] = OrderedDict() self._total_bytes: int = 0 self._lock = asyncio.Lock() # Stats counters self._hits: int = 0 self._misses: int = 0 # Disk scan deferred to ensure_loaded() so startup is non-blocking self._scan_loaded: bool = False # Ensure the cache directory exists self._dir.mkdir(parents=True, exist_ok=True)
[docs] async def ensure_loaded(self) -> None: """Load the in-memory index from disk (non-blocking). Called during async startup so the sync disk scan does not block the event loop. Idempotent — safe to call multiple times. """ if self._scan_loaded: return await asyncio.to_thread(self._scan_disk) self._scan_loaded = True
# ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] async def get(self, url: str) -> tuple[bytes, str, str] | None: """Look up *url* in the cache and return its media triple, or ``None`` on a miss. On a hit the entry is promoted to most-recently-used and its ``last_access`` timestamp refreshed so the LRU ordering stays accurate. If the entry's bytes are not resident in RAM (it was loaded by a disk scan or shed by the memory budget) they are re-read from disk via :meth:`_read_disk`; should that file have vanished the stale index entry is evicted and a miss is reported. Increments the hit counter and logs on success. All work happens under the shared ``asyncio.Lock``. Reached indirectly through :meth:`get_or_download`, which is the entry point used by the platform adapters (``platforms/discord.py``, ``platforms/matrix.py``, ``platforms/discord_self.py``, ``platforms/emoji_resolver.py``). Args: url (str): The media URL serving as the cache key. Returns: tuple[bytes, str, str] | None: ``(data, mimetype, filename)`` if the URL is cached and its bytes are recoverable, otherwise ``None``. """ async with self._lock: entry = self._index.get(url) if entry is None: return None # Promote to most-recently-used self._index.move_to_end(url) entry.last_access = time.time() # If bytes aren't in memory, reload from disk if entry.data is None: entry.data = await self._read_disk(entry.disk_key) if entry.data is None: # File vanished – remove stale index entry self._evict_entry(url) return None self._hits += 1 logger.info( "Media cache HIT: %s (%s, %d bytes)", entry.filename, entry.mimetype, entry.size, ) return entry.data, entry.mimetype, entry.filename
[docs] async def put( self, url: str, data: bytes, mimetype: str, filename: str, ) -> None: """Insert media bytes for *url*, writing through to disk and enforcing limits. If the URL is already indexed this only bumps its LRU position and access time (the existing bytes are kept). For a new entry it derives the disk key, persists the bytes plus a JSON metadata sidecar via :meth:`_write_disk`, records the entry in the in-memory index, then evicts the oldest entries while over the disk budget (:meth:`_enforce_limits`) and sheds resident bytes for entries past the memory cap (:meth:`_shed_memory`). Touches the filesystem and runs under the shared ``asyncio.Lock``. Called by :meth:`get_or_download` after a successful, non-empty download; not invoked directly elsewhere in the repo. Args: url (str): The media URL used as the cache key. data (bytes): The raw media bytes to persist. mimetype (str): The media MIME type, stored in the sidecar. filename (str): A human-readable filename, stored in the sidecar. """ async with self._lock: # If already cached, just update access time if url in self._index: self._index.move_to_end(url) self._index[url].last_access = time.time() return disk_key = self._url_to_key(url) entry = _CacheEntry( url=url, mimetype=mimetype, filename=filename, size=len(data), disk_key=disk_key, last_access=time.time(), data=data, ) # Write to disk await self._write_disk(entry, data) self._index[url] = entry self._total_bytes += entry.size # Evict oldest entries if over disk budget await self._enforce_limits() # Shed in-memory bytes for old entries self._shed_memory()
[docs] async def get_or_download( self, url: str, downloader: Downloader, ) -> tuple[bytes, str, str]: """Return cached media or call *downloader* and cache the result. *downloader* is an ``async`` callable returning ``(data, mimetype, filename)``. GIF images are automatically re-encoded as MP4 before being stored so that the cache always contains the video format. """ cached = await self.get(url) if cached is not None: return cached # Cache miss – perform the download (outside the lock) self._misses += 1 data, mimetype, filename = await downloader() # Never cache an empty/failed download. Caching ``b""`` poisons the # cache so every later fetch of this URL returns a broken (empty) image # — a persistent "the model can't see the image". Return it un-cached so # a re-send re-downloads and the caller can drop/replace it. if not data: logger.warning( "Media download returned empty data for %s (%s); not caching", filename, mimetype, ) return data, mimetype, filename # Re-encode GIF → MP4 before caching so both the stored entry # and the returned data use a well-supported video format. from platforms.media_common import maybe_reencode_gif data, mimetype, filename = await maybe_reencode_gif( data, mimetype, filename, ) logger.info( "Media cache MISS: downloading %s (%s, %d bytes)", filename, mimetype, len(data), ) await self.put(url, data, mimetype, filename) return data, mimetype, filename
[docs] def stats(self) -> dict[str, Any]: """Return a snapshot of cache statistics for monitoring. Reports the total and in-memory entry counts, on-disk byte and megabyte totals, the configured size cap, the running hit/miss counters, and a derived hit rate. A pure read of in-memory state — it acquires no lock and touches neither disk nor network, so it is cheap and safe to poll from an admin endpoint. Called by the bot admin status handler in ``web/bot_admin.py``, which exposes the result under the ``media_cache`` key of its JSON response. Returns: dict[str, Any]: A snapshot with keys ``entries``, ``entries_in_memory``, ``total_bytes``, ``total_mb``, ``max_mb``, ``hits``, ``misses``, and ``hit_rate``. """ entries_in_memory = sum(1 for e in self._index.values() if e.data is not None) return { "entries": len(self._index), "entries_in_memory": entries_in_memory, "total_bytes": self._total_bytes, "total_mb": round(self._total_bytes / (1024 * 1024), 2), "max_mb": round(self._max_bytes / (1024 * 1024), 2), "hits": self._hits, "misses": self._misses, "hit_rate": ( round(self._hits / (self._hits + self._misses), 3) if (self._hits + self._misses) > 0 else 0.0 ), }
# ------------------------------------------------------------------ # Disk I/O helpers # ------------------------------------------------------------------ def _url_to_key(self, url: str) -> str: """Derive the stable disk key for a media URL. Hashes the UTF-8 bytes of the URL with SHA-256 and returns the hex digest, which is reused as the basename for both the ``.dat`` and ``.json`` files on disk. Pure and deterministic — no I/O — so the same URL always maps to the same files. Called by :meth:`put` when creating a new entry. Args: url (str): The media URL to hash. Returns: str: The 64-character lowercase SHA-256 hex digest. """ return hashlib.sha256(url.encode()).hexdigest() def _dat_path(self, disk_key: str) -> Path: """Build the on-disk path holding the raw bytes for a disk key. Joins the cache directory with ``{disk_key}.dat``; the companion :meth:`_meta_path` yields the matching sidecar. Pure path construction with no I/O. Used throughout the disk helpers (:meth:`_write_disk`, :meth:`_read_disk`, :meth:`_delete_disk`, :meth:`_scan_disk`). Args: disk_key (str): The SHA-256 hex key naming the entry's files. Returns: Path: The ``.dat`` path under the cache directory. """ return self._dir / f"{disk_key}.dat" def _meta_path(self, disk_key: str) -> Path: """Build the on-disk path holding the JSON metadata sidecar for a disk key. Joins the cache directory with ``{disk_key}.json``; the companion :meth:`_dat_path` yields the matching raw-bytes file. Pure path construction with no I/O. Used by :meth:`_write_disk` and :meth:`_delete_disk` (the startup scan derives the sidecar path directly while globbing). Args: disk_key (str): The SHA-256 hex key naming the entry's files. Returns: Path: The ``.json`` sidecar path under the cache directory. """ return self._dir / f"{disk_key}.json" async def _write_disk(self, entry: _CacheEntry, data: bytes) -> None: """Persist an entry's bytes and its JSON metadata sidecar to disk. Writes the raw bytes to the ``.dat`` file and a small JSON document (url, mimetype, filename, size, timestamp) to the ``.json`` sidecar so the entry can be rebuilt by :meth:`_scan_disk` after a restart. Both writes use ``aiofiles`` to stay off the event loop. Touches the filesystem; the caller (:meth:`put`) holds the lock. Called only by :meth:`put`. Args: entry (_CacheEntry): The index record describing the media, supplying the disk key and the sidecar fields. data (bytes): The raw media bytes to write to the ``.dat`` file. """ dat_path = self._dat_path(entry.disk_key) meta_path = self._meta_path(entry.disk_key) meta = { "url": entry.url, "mimetype": entry.mimetype, "filename": entry.filename, "size": entry.size, "ts": entry.last_access, } async with aiofiles.open(dat_path, "wb") as f: await f.write(data) async with aiofiles.open(meta_path, "w", encoding="utf-8") as f: await f.write(json.dumps(meta, indent=2)) async def _read_disk(self, disk_key: str) -> bytes | None: """Read an entry's raw bytes back from its ``.dat`` file. Returns ``None`` when the file is absent or unreadable (logging a warning on an ``OSError``) so callers can treat a vanished file as a cache miss rather than crashing. Reads via ``aiofiles`` to avoid blocking the event loop. Called by :meth:`get` when an entry's bytes are not resident in RAM and must be reloaded from disk. Args: disk_key (str): The SHA-256 hex key identifying the entry. Returns: bytes | None: The cached bytes, or ``None`` if the file is missing or could not be read. """ dat_path = self._dat_path(disk_key) if not dat_path.exists(): return None try: async with aiofiles.open(dat_path, "rb") as f: return await f.read() except OSError: logger.warning("Failed to read cached file %s", dat_path) return None def _delete_disk(self, disk_key: str) -> None: """Delete both on-disk files for an evicted entry. Unlinks the ``.dat`` and ``.json`` files for the given key, ignoring a missing file and logging a warning on any other ``OSError`` so a cleanup failure never aborts eviction. Touches the filesystem; this synchronous helper is invoked from within the lock by the eviction path. Called by :meth:`_evict_entry` (which itself runs from :meth:`get` and :meth:`_enforce_limits`). Args: disk_key (str): The SHA-256 hex key whose files should be removed. """ for path in (self._dat_path(disk_key), self._meta_path(disk_key)): try: path.unlink(missing_ok=True) except OSError: logger.warning("Could not delete cache file %s", path) # ------------------------------------------------------------------ # Startup disk scan # ------------------------------------------------------------------ def _scan_disk(self) -> None: """Populate the in-memory index from existing disk cache files. Only metadata is loaded – actual bytes stay on disk until requested. """ count = 0 for meta_path in sorted(self._dir.glob("*.json")): try: raw = meta_path.read_text(encoding="utf-8") meta = json.loads(raw) except (OSError, json.JSONDecodeError): logger.warning("Skipping unreadable cache meta %s", meta_path) continue disk_key = meta_path.stem # filename without .json dat_path = self._dat_path(disk_key) if not dat_path.exists(): # Orphaned metadata – clean up meta_path.unlink(missing_ok=True) continue url = meta["url"] entry = _CacheEntry( url=url, mimetype=meta.get("mimetype", "application/octet-stream"), filename=meta.get("filename", "attachment"), size=meta.get("size", 0) or dat_path.stat().st_size, disk_key=disk_key, last_access=meta.get("ts", 0.0), data=None, # don't load bytes on startup ) self._index[url] = entry self._total_bytes += entry.size count += 1 # Sort by last_access so the OrderedDict is in LRU order if self._index: sorted_items = sorted( self._index.items(), key=lambda kv: kv[1].last_access, ) self._index.clear() for k, v in sorted_items: self._index[k] = v if count: logger.info( "Media cache: loaded %d entries from disk (%.1f MB)", count, self._total_bytes / (1024 * 1024), ) # ------------------------------------------------------------------ # Eviction # ------------------------------------------------------------------ async def _enforce_limits(self) -> None: """Evict least-recently-used entries until disk usage is within budget. Repeatedly drops the entry at the front of the LRU ``OrderedDict`` (the oldest) via :meth:`_evict_entry` while the tracked ``self._total_bytes`` exceeds the configured ``self._max_bytes``. Each eviction removes the entry from the index and deletes its files, so this touches the filesystem. Runs under the lock held by the caller. Called by :meth:`put` after a new entry is added. """ while self._total_bytes > self._max_bytes and self._index: # Pop the least-recently-used entry (front of OrderedDict) url, entry = next(iter(self._index.items())) self._evict_entry(url) def _evict_entry(self, url: str) -> None: """Remove one entry from the in-memory index and from disk. Pops the entry for *url*, subtracts its size from the running ``self._total_bytes`` total, and deletes its ``.dat``/``.json`` files via :meth:`_delete_disk`; a no-op if the URL is not indexed. Touches the filesystem and is called while the lock is held. Called by :meth:`get` (when a backing file has vanished) and by :meth:`_enforce_limits` (when shrinking to the disk budget). Args: url (str): The cache key of the entry to remove. """ entry = self._index.pop(url, None) if entry is None: return self._total_bytes -= entry.size self._delete_disk(entry.disk_key) logger.debug("Media cache: evicted %s (%s)", entry.filename, url) def _shed_memory(self) -> None: """Release resident bytes for the oldest entries to honor the memory cap. Counts entries whose ``data`` is currently held in RAM and, while that exceeds ``self._max_memory``, sets ``data`` to ``None`` on the oldest ones. This frees memory only — the entries stay indexed and their bytes remain on disk, so :meth:`get` simply reloads them via :meth:`_read_disk` on the next access. No I/O here; runs under the lock held by the caller. Called by :meth:`put` after writing a new entry. """ in_memory = [url for url, e in self._index.items() if e.data is not None] while len(in_memory) > self._max_memory: oldest_url = in_memory.pop(0) entry = self._index.get(oldest_url) if entry is not None: entry.data = None