Source code for media_cache

"""Disk-backed LRU media cache.

Caches downloaded media (images, audio, video, files) so that the same
URL is never fetched twice.  An in-memory index provides fast lookups
while a configurable disk directory persists data across restarts.

Each cached entry is stored on disk as two files::

    {sha256_of_url}.dat   – raw media bytes
    {sha256_of_url}.json  – sidecar metadata (mimetype, filename, url, ts, size)

On startup the disk directory is scanned to rebuild the in-memory index
*without* loading all bytes into RAM.
"""

from __future__ import annotations

import asyncio
import hashlib
import json
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable

import aiofiles

logger = logging.getLogger(__name__)

# Type alias for an async callable that returns (bytes, mimetype, filename)
Downloader = Callable[[], Awaitable[tuple[bytes, str, str]]]


# ------------------------------------------------------------------
# Internal data types
# ------------------------------------------------------------------

@dataclass
class _CacheEntry:
    """Lightweight index record – bytes are only loaded on demand."""

    url: str
    mimetype: str
    filename: str
    size: int  # byte length of the media data
    disk_key: str  # sha256 hex used for filenames on disk
    last_access: float  # epoch seconds

    # When populated, holds the raw media bytes (soft in-memory cache).
    # May be ``None`` if the entry was loaded from a disk-only scan.
    data: bytes | None = None


# ------------------------------------------------------------------
# MediaCache
# ------------------------------------------------------------------

[docs] class MediaCache: """Two-tier (memory + disk) LRU media cache. Parameters ---------- cache_dir: Directory for persistent storage. Created automatically. max_size_mb: Approximate cap on total disk usage in megabytes. Oldest entries are evicted when the limit is exceeded. max_memory_items: Maximum number of entries whose *bytes* are kept in RAM. Entries beyond this limit are still indexed (metadata only) and will be read back from disk on the next access. """
[docs] def __init__( self, cache_dir: str | Path = "media_cache", max_size_mb: int = 500, max_memory_items: int = 64, ) -> None: """Initialize the instance. Args: cache_dir (str | Path): The cache dir value. max_size_mb (int): The max size mb value. max_memory_items (int): The max memory items value. """ self._dir = Path(cache_dir) self._max_bytes = max_size_mb * 1024 * 1024 self._max_memory = max_memory_items # Ordered by last access (most-recent at the end). self._index: OrderedDict[str, _CacheEntry] = OrderedDict() self._total_bytes: int = 0 self._lock = asyncio.Lock() # Stats counters self._hits: int = 0 self._misses: int = 0 # Disk scan deferred to ensure_loaded() so startup is non-blocking self._scan_loaded: bool = False # Ensure the cache directory exists self._dir.mkdir(parents=True, exist_ok=True)
[docs] async def ensure_loaded(self) -> None: """Load the in-memory index from disk (non-blocking). Called during async startup so the sync disk scan does not block the event loop. Idempotent — safe to call multiple times. """ if self._scan_loaded: return await asyncio.to_thread(self._scan_disk) self._scan_loaded = True
# ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] async def get(self, url: str) -> tuple[bytes, str, str] | None: """Return ``(data, mimetype, filename)`` if *url* is cached, else ``None``.""" async with self._lock: entry = self._index.get(url) if entry is None: return None # Promote to most-recently-used self._index.move_to_end(url) entry.last_access = time.time() # If bytes aren't in memory, reload from disk if entry.data is None: entry.data = await self._read_disk(entry.disk_key) if entry.data is None: # File vanished – remove stale index entry self._evict_entry(url) return None self._hits += 1 logger.info( "Media cache HIT: %s (%s, %d bytes)", entry.filename, entry.mimetype, entry.size, ) return entry.data, entry.mimetype, entry.filename
[docs] async def put( self, url: str, data: bytes, mimetype: str, filename: str, ) -> None: """Store media bytes under *url*, writing through to disk.""" async with self._lock: # If already cached, just update access time if url in self._index: self._index.move_to_end(url) self._index[url].last_access = time.time() return disk_key = self._url_to_key(url) entry = _CacheEntry( url=url, mimetype=mimetype, filename=filename, size=len(data), disk_key=disk_key, last_access=time.time(), data=data, ) # Write to disk await self._write_disk(entry, data) self._index[url] = entry self._total_bytes += entry.size # Evict oldest entries if over disk budget await self._enforce_limits() # Shed in-memory bytes for old entries self._shed_memory()
[docs] async def get_or_download( self, url: str, downloader: Downloader, ) -> tuple[bytes, str, str]: """Return cached media or call *downloader* and cache the result. *downloader* is an ``async`` callable returning ``(data, mimetype, filename)``. GIF images are automatically re-encoded as MP4 before being stored so that the cache always contains the video format. """ cached = await self.get(url) if cached is not None: return cached # Cache miss – perform the download (outside the lock) self._misses += 1 data, mimetype, filename = await downloader() # Re-encode GIF → MP4 before caching so both the stored entry # and the returned data use a well-supported video format. from platforms.media_common import maybe_reencode_gif data, mimetype, filename = await maybe_reencode_gif( data, mimetype, filename, ) logger.info( "Media cache MISS: downloading %s (%s, %d bytes)", filename, mimetype, len(data), ) await self.put(url, data, mimetype, filename) return data, mimetype, filename
[docs] def stats(self) -> dict[str, Any]: """Return a snapshot of cache statistics.""" entries_in_memory = sum( 1 for e in self._index.values() if e.data is not None ) return { "entries": len(self._index), "entries_in_memory": entries_in_memory, "total_bytes": self._total_bytes, "total_mb": round(self._total_bytes / (1024 * 1024), 2), "max_mb": round(self._max_bytes / (1024 * 1024), 2), "hits": self._hits, "misses": self._misses, "hit_rate": ( round(self._hits / (self._hits + self._misses), 3) if (self._hits + self._misses) > 0 else 0.0 ), }
# ------------------------------------------------------------------ # Disk I/O helpers # ------------------------------------------------------------------ def _url_to_key(self, url: str) -> str: """Internal helper: url to key. Args: url (str): URL string. Returns: str: Result string. """ return hashlib.sha256(url.encode()).hexdigest() def _dat_path(self, disk_key: str) -> Path: """Internal helper: dat path. Args: disk_key (str): The disk key value. Returns: Path: The result. """ return self._dir / f"{disk_key}.dat" def _meta_path(self, disk_key: str) -> Path: """Internal helper: meta path. Args: disk_key (str): The disk key value. Returns: Path: The result. """ return self._dir / f"{disk_key}.json" async def _write_disk(self, entry: _CacheEntry, data: bytes) -> None: """Write data + metadata sidecar to the cache directory.""" dat_path = self._dat_path(entry.disk_key) meta_path = self._meta_path(entry.disk_key) meta = { "url": entry.url, "mimetype": entry.mimetype, "filename": entry.filename, "size": entry.size, "ts": entry.last_access, } async with aiofiles.open(dat_path, "wb") as f: await f.write(data) async with aiofiles.open(meta_path, "w", encoding="utf-8") as f: await f.write(json.dumps(meta, indent=2)) async def _read_disk(self, disk_key: str) -> bytes | None: """Read cached bytes from disk, or ``None`` if missing.""" dat_path = self._dat_path(disk_key) if not dat_path.exists(): return None try: async with aiofiles.open(dat_path, "rb") as f: return await f.read() except OSError: logger.warning("Failed to read cached file %s", dat_path) return None def _delete_disk(self, disk_key: str) -> None: """Remove data + metadata files from disk.""" for path in (self._dat_path(disk_key), self._meta_path(disk_key)): try: path.unlink(missing_ok=True) except OSError: logger.warning("Could not delete cache file %s", path) # ------------------------------------------------------------------ # Startup disk scan # ------------------------------------------------------------------ def _scan_disk(self) -> None: """Populate the in-memory index from existing disk cache files. Only metadata is loaded – actual bytes stay on disk until requested. """ count = 0 for meta_path in sorted(self._dir.glob("*.json")): try: raw = meta_path.read_text(encoding="utf-8") meta = json.loads(raw) except (OSError, json.JSONDecodeError): logger.warning("Skipping unreadable cache meta %s", meta_path) continue disk_key = meta_path.stem # filename without .json dat_path = self._dat_path(disk_key) if not dat_path.exists(): # Orphaned metadata – clean up meta_path.unlink(missing_ok=True) continue url = meta["url"] entry = _CacheEntry( url=url, mimetype=meta.get("mimetype", "application/octet-stream"), filename=meta.get("filename", "attachment"), size=meta.get("size", 0) or dat_path.stat().st_size, disk_key=disk_key, last_access=meta.get("ts", 0.0), data=None, # don't load bytes on startup ) self._index[url] = entry self._total_bytes += entry.size count += 1 # Sort by last_access so the OrderedDict is in LRU order if self._index: sorted_items = sorted( self._index.items(), key=lambda kv: kv[1].last_access, ) self._index.clear() for k, v in sorted_items: self._index[k] = v if count: logger.info( "Media cache: loaded %d entries from disk (%.1f MB)", count, self._total_bytes / (1024 * 1024), ) # ------------------------------------------------------------------ # Eviction # ------------------------------------------------------------------ async def _enforce_limits(self) -> None: """Evict oldest entries until total disk size is within budget.""" while self._total_bytes > self._max_bytes and self._index: # Pop the least-recently-used entry (front of OrderedDict) url, entry = next(iter(self._index.items())) self._evict_entry(url) def _evict_entry(self, url: str) -> None: """Remove a single entry from both the index and disk.""" entry = self._index.pop(url, None) if entry is None: return self._total_bytes -= entry.size self._delete_disk(entry.disk_key) logger.debug("Media cache: evicted %s (%s)", entry.filename, url) def _shed_memory(self) -> None: """Drop in-memory bytes for the oldest entries to stay within ``max_memory_items``.""" in_memory = [ url for url, e in self._index.items() if e.data is not None ] while len(in_memory) > self._max_memory: oldest_url = in_memory.pop(0) entry = self._index.get(oldest_url) if entry is not None: entry.data = None