Source code for media_cache
"""Disk-backed LRU media cache.
Caches downloaded media (images, audio, video, files) so that the same
URL is never fetched twice. An in-memory index provides fast lookups
while a configurable disk directory persists data across restarts.
Each cached entry is stored on disk as two files::
{sha256_of_url}.dat – raw media bytes
{sha256_of_url}.json – sidecar metadata (mimetype, filename, url, ts, size)
On startup the disk directory is scanned to rebuild the in-memory index
*without* loading all bytes into RAM.
"""
from __future__ import annotations
import asyncio
import hashlib
import jsonutil as json
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable
import aiofiles
logger = logging.getLogger(__name__)
# Type alias for an async callable that returns (bytes, mimetype, filename)
Downloader = Callable[[], Awaitable[tuple[bytes, str, str]]]
# ------------------------------------------------------------------
# Internal data types
# ------------------------------------------------------------------
@dataclass
class _CacheEntry:
"""Lightweight index record for one cached media item.
Holds just the metadata needed to locate and describe a cached
download (``url``, ``mimetype``, ``filename``, ``size``, ``disk_key``,
``last_access``) so the in-memory index can stay small. The raw
``data`` bytes are an optional soft cache: they are populated on
:meth:`MediaCache.put` and on a cache-hit reload, but left ``None``
after a disk-only startup scan or once shed by the memory-budget
logic, in which case the bytes are re-read from disk on next access.
Instantiated only inside :class:`MediaCache` (in :meth:`MediaCache.put`
and :meth:`MediaCache._scan_disk`); not used elsewhere in the repo.
"""
url: str
mimetype: str
filename: str
size: int # byte length of the media data
disk_key: str # sha256 hex used for filenames on disk
last_access: float # epoch seconds
# When populated, holds the raw media bytes (soft in-memory cache).
# May be ``None`` if the entry was loaded from a disk-only scan.
data: bytes | None = None
# ------------------------------------------------------------------
# MediaCache
# ------------------------------------------------------------------
[docs]
class MediaCache:
"""Two-tier (memory + disk) LRU media cache.
Parameters
----------
cache_dir:
Directory for persistent storage. Created automatically.
max_size_mb:
Approximate cap on total disk usage in megabytes. Oldest
entries are evicted when the limit is exceeded.
max_memory_items:
Maximum number of entries whose *bytes* are kept in RAM.
Entries beyond this limit are still indexed (metadata only)
and will be read back from disk on the next access.
"""
[docs]
def __init__(
self,
cache_dir: str | Path = "media_cache",
max_size_mb: int = 500,
max_memory_items: int = 64,
) -> None:
"""Set up an empty cache rooted at *cache_dir* and create the directory.
Configures the disk-byte budget and in-memory item cap, allocates
the LRU index (an ``OrderedDict`` keyed by URL), the hit/miss
counters, and the ``asyncio.Lock`` that serializes index mutations.
The actual disk scan is deferred to :meth:`ensure_loaded` so that
constructing a ``MediaCache`` never blocks the event loop; only the
``mkdir`` (touching the filesystem) happens here.
Constructed by the gateway and web services from configured values
— see ``gateway_main.py`` and ``web_main.py``, both of which pass
``cfg.media_cache_dir`` and ``cfg.media_cache_max_mb``.
Args:
cache_dir (str | Path): Directory used for persistent storage of
the ``.dat``/``.json`` files; created automatically.
max_size_mb (int): Approximate cap on total on-disk usage in
megabytes; oldest entries are evicted once exceeded.
max_memory_items (int): Maximum number of entries whose raw bytes
are kept resident in RAM. Excess entries stay indexed by
metadata only and reload from disk on next access.
"""
self._dir = Path(cache_dir)
self._max_bytes = max_size_mb * 1024 * 1024
self._max_memory = max_memory_items
# Ordered by last access (most-recent at the end).
self._index: OrderedDict[str, _CacheEntry] = OrderedDict()
self._total_bytes: int = 0
self._lock = asyncio.Lock()
# Stats counters
self._hits: int = 0
self._misses: int = 0
# Disk scan deferred to ensure_loaded() so startup is non-blocking
self._scan_loaded: bool = False
# Ensure the cache directory exists
self._dir.mkdir(parents=True, exist_ok=True)
[docs]
async def ensure_loaded(self) -> None:
"""Load the in-memory index from disk (non-blocking).
Called during async startup so the sync disk scan does not block
the event loop. Idempotent — safe to call multiple times.
"""
if self._scan_loaded:
return
await asyncio.to_thread(self._scan_disk)
self._scan_loaded = True
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
async def get(self, url: str) -> tuple[bytes, str, str] | None:
"""Look up *url* in the cache and return its media triple, or ``None`` on a miss.
On a hit the entry is promoted to most-recently-used and its
``last_access`` timestamp refreshed so the LRU ordering stays
accurate. If the entry's bytes are not resident in RAM (it was
loaded by a disk scan or shed by the memory budget) they are
re-read from disk via :meth:`_read_disk`; should that file have
vanished the stale index entry is evicted and a miss is reported.
Increments the hit counter and logs on success. All work happens
under the shared ``asyncio.Lock``.
Reached indirectly through :meth:`get_or_download`, which is the
entry point used by the platform adapters
(``platforms/discord.py``, ``platforms/matrix.py``,
``platforms/discord_self.py``, ``platforms/emoji_resolver.py``).
Args:
url (str): The media URL serving as the cache key.
Returns:
tuple[bytes, str, str] | None: ``(data, mimetype, filename)`` if
the URL is cached and its bytes are recoverable, otherwise
``None``.
"""
async with self._lock:
entry = self._index.get(url)
if entry is None:
return None
# Promote to most-recently-used
self._index.move_to_end(url)
entry.last_access = time.time()
# If bytes aren't in memory, reload from disk
if entry.data is None:
entry.data = await self._read_disk(entry.disk_key)
if entry.data is None:
# File vanished – remove stale index entry
self._evict_entry(url)
return None
self._hits += 1
logger.info(
"Media cache HIT: %s (%s, %d bytes)",
entry.filename,
entry.mimetype,
entry.size,
)
return entry.data, entry.mimetype, entry.filename
[docs]
async def put(
self,
url: str,
data: bytes,
mimetype: str,
filename: str,
) -> None:
"""Insert media bytes for *url*, writing through to disk and enforcing limits.
If the URL is already indexed this only bumps its LRU position and
access time (the existing bytes are kept). For a new entry it
derives the disk key, persists the bytes plus a JSON metadata
sidecar via :meth:`_write_disk`, records the entry in the in-memory
index, then evicts the oldest entries while over the disk budget
(:meth:`_enforce_limits`) and sheds resident bytes for entries past
the memory cap (:meth:`_shed_memory`). Touches the filesystem and
runs under the shared ``asyncio.Lock``.
Called by :meth:`get_or_download` after a successful, non-empty
download; not invoked directly elsewhere in the repo.
Args:
url (str): The media URL used as the cache key.
data (bytes): The raw media bytes to persist.
mimetype (str): The media MIME type, stored in the sidecar.
filename (str): A human-readable filename, stored in the sidecar.
"""
async with self._lock:
# If already cached, just update access time
if url in self._index:
self._index.move_to_end(url)
self._index[url].last_access = time.time()
return
disk_key = self._url_to_key(url)
entry = _CacheEntry(
url=url,
mimetype=mimetype,
filename=filename,
size=len(data),
disk_key=disk_key,
last_access=time.time(),
data=data,
)
# Write to disk
await self._write_disk(entry, data)
self._index[url] = entry
self._total_bytes += entry.size
# Evict oldest entries if over disk budget
await self._enforce_limits()
# Shed in-memory bytes for old entries
self._shed_memory()
[docs]
async def get_or_download(
self,
url: str,
downloader: Downloader,
) -> tuple[bytes, str, str]:
"""Return cached media or call *downloader* and cache the result.
*downloader* is an ``async`` callable returning
``(data, mimetype, filename)``.
GIF images are automatically re-encoded as MP4 before being
stored so that the cache always contains the video format.
"""
cached = await self.get(url)
if cached is not None:
return cached
# Cache miss – perform the download (outside the lock)
self._misses += 1
data, mimetype, filename = await downloader()
# Never cache an empty/failed download. Caching ``b""`` poisons the
# cache so every later fetch of this URL returns a broken (empty) image
# — a persistent "the model can't see the image". Return it un-cached so
# a re-send re-downloads and the caller can drop/replace it.
if not data:
logger.warning(
"Media download returned empty data for %s (%s); not caching",
filename,
mimetype,
)
return data, mimetype, filename
# Re-encode GIF → MP4 before caching so both the stored entry
# and the returned data use a well-supported video format.
from platforms.media_common import maybe_reencode_gif
data, mimetype, filename = await maybe_reencode_gif(
data,
mimetype,
filename,
)
logger.info(
"Media cache MISS: downloading %s (%s, %d bytes)",
filename,
mimetype,
len(data),
)
await self.put(url, data, mimetype, filename)
return data, mimetype, filename
[docs]
def stats(self) -> dict[str, Any]:
"""Return a snapshot of cache statistics for monitoring.
Reports the total and in-memory entry counts, on-disk byte and
megabyte totals, the configured size cap, the running hit/miss
counters, and a derived hit rate. A pure read of in-memory state —
it acquires no lock and touches neither disk nor network, so it is
cheap and safe to poll from an admin endpoint.
Called by the bot admin status handler in ``web/bot_admin.py``,
which exposes the result under the ``media_cache`` key of its JSON
response.
Returns:
dict[str, Any]: A snapshot with keys ``entries``,
``entries_in_memory``, ``total_bytes``, ``total_mb``, ``max_mb``,
``hits``, ``misses``, and ``hit_rate``.
"""
entries_in_memory = sum(1 for e in self._index.values() if e.data is not None)
return {
"entries": len(self._index),
"entries_in_memory": entries_in_memory,
"total_bytes": self._total_bytes,
"total_mb": round(self._total_bytes / (1024 * 1024), 2),
"max_mb": round(self._max_bytes / (1024 * 1024), 2),
"hits": self._hits,
"misses": self._misses,
"hit_rate": (
round(self._hits / (self._hits + self._misses), 3)
if (self._hits + self._misses) > 0
else 0.0
),
}
# ------------------------------------------------------------------
# Disk I/O helpers
# ------------------------------------------------------------------
def _url_to_key(self, url: str) -> str:
"""Derive the stable disk key for a media URL.
Hashes the UTF-8 bytes of the URL with SHA-256 and returns the hex
digest, which is reused as the basename for both the ``.dat`` and
``.json`` files on disk. Pure and deterministic — no I/O — so the
same URL always maps to the same files. Called by :meth:`put` when
creating a new entry.
Args:
url (str): The media URL to hash.
Returns:
str: The 64-character lowercase SHA-256 hex digest.
"""
return hashlib.sha256(url.encode()).hexdigest()
def _dat_path(self, disk_key: str) -> Path:
"""Build the on-disk path holding the raw bytes for a disk key.
Joins the cache directory with ``{disk_key}.dat``; the companion
:meth:`_meta_path` yields the matching sidecar. Pure path
construction with no I/O. Used throughout the disk helpers
(:meth:`_write_disk`, :meth:`_read_disk`, :meth:`_delete_disk`,
:meth:`_scan_disk`).
Args:
disk_key (str): The SHA-256 hex key naming the entry's files.
Returns:
Path: The ``.dat`` path under the cache directory.
"""
return self._dir / f"{disk_key}.dat"
def _meta_path(self, disk_key: str) -> Path:
"""Build the on-disk path holding the JSON metadata sidecar for a disk key.
Joins the cache directory with ``{disk_key}.json``; the companion
:meth:`_dat_path` yields the matching raw-bytes file. Pure path
construction with no I/O. Used by :meth:`_write_disk` and
:meth:`_delete_disk` (the startup scan derives the sidecar path
directly while globbing).
Args:
disk_key (str): The SHA-256 hex key naming the entry's files.
Returns:
Path: The ``.json`` sidecar path under the cache directory.
"""
return self._dir / f"{disk_key}.json"
async def _write_disk(self, entry: _CacheEntry, data: bytes) -> None:
"""Persist an entry's bytes and its JSON metadata sidecar to disk.
Writes the raw bytes to the ``.dat`` file and a small JSON document
(url, mimetype, filename, size, timestamp) to the ``.json`` sidecar
so the entry can be rebuilt by :meth:`_scan_disk` after a restart.
Both writes use ``aiofiles`` to stay off the event loop. Touches the
filesystem; the caller (:meth:`put`) holds the lock.
Called only by :meth:`put`.
Args:
entry (_CacheEntry): The index record describing the media,
supplying the disk key and the sidecar fields.
data (bytes): The raw media bytes to write to the ``.dat`` file.
"""
dat_path = self._dat_path(entry.disk_key)
meta_path = self._meta_path(entry.disk_key)
meta = {
"url": entry.url,
"mimetype": entry.mimetype,
"filename": entry.filename,
"size": entry.size,
"ts": entry.last_access,
}
async with aiofiles.open(dat_path, "wb") as f:
await f.write(data)
async with aiofiles.open(meta_path, "w", encoding="utf-8") as f:
await f.write(json.dumps(meta, indent=2))
async def _read_disk(self, disk_key: str) -> bytes | None:
"""Read an entry's raw bytes back from its ``.dat`` file.
Returns ``None`` when the file is absent or unreadable (logging a
warning on an ``OSError``) so callers can treat a vanished file as a
cache miss rather than crashing. Reads via ``aiofiles`` to avoid
blocking the event loop.
Called by :meth:`get` when an entry's bytes are not resident in RAM
and must be reloaded from disk.
Args:
disk_key (str): The SHA-256 hex key identifying the entry.
Returns:
bytes | None: The cached bytes, or ``None`` if the file is
missing or could not be read.
"""
dat_path = self._dat_path(disk_key)
if not dat_path.exists():
return None
try:
async with aiofiles.open(dat_path, "rb") as f:
return await f.read()
except OSError:
logger.warning("Failed to read cached file %s", dat_path)
return None
def _delete_disk(self, disk_key: str) -> None:
"""Delete both on-disk files for an evicted entry.
Unlinks the ``.dat`` and ``.json`` files for the given key, ignoring
a missing file and logging a warning on any other ``OSError`` so a
cleanup failure never aborts eviction. Touches the filesystem; this
synchronous helper is invoked from within the lock by the eviction
path.
Called by :meth:`_evict_entry` (which itself runs from :meth:`get`
and :meth:`_enforce_limits`).
Args:
disk_key (str): The SHA-256 hex key whose files should be removed.
"""
for path in (self._dat_path(disk_key), self._meta_path(disk_key)):
try:
path.unlink(missing_ok=True)
except OSError:
logger.warning("Could not delete cache file %s", path)
# ------------------------------------------------------------------
# Startup disk scan
# ------------------------------------------------------------------
def _scan_disk(self) -> None:
"""Populate the in-memory index from existing disk cache files.
Only metadata is loaded – actual bytes stay on disk until
requested.
"""
count = 0
for meta_path in sorted(self._dir.glob("*.json")):
try:
raw = meta_path.read_text(encoding="utf-8")
meta = json.loads(raw)
except (OSError, json.JSONDecodeError):
logger.warning("Skipping unreadable cache meta %s", meta_path)
continue
disk_key = meta_path.stem # filename without .json
dat_path = self._dat_path(disk_key)
if not dat_path.exists():
# Orphaned metadata – clean up
meta_path.unlink(missing_ok=True)
continue
url = meta["url"]
entry = _CacheEntry(
url=url,
mimetype=meta.get("mimetype", "application/octet-stream"),
filename=meta.get("filename", "attachment"),
size=meta.get("size", 0) or dat_path.stat().st_size,
disk_key=disk_key,
last_access=meta.get("ts", 0.0),
data=None, # don't load bytes on startup
)
self._index[url] = entry
self._total_bytes += entry.size
count += 1
# Sort by last_access so the OrderedDict is in LRU order
if self._index:
sorted_items = sorted(
self._index.items(),
key=lambda kv: kv[1].last_access,
)
self._index.clear()
for k, v in sorted_items:
self._index[k] = v
if count:
logger.info(
"Media cache: loaded %d entries from disk (%.1f MB)",
count,
self._total_bytes / (1024 * 1024),
)
# ------------------------------------------------------------------
# Eviction
# ------------------------------------------------------------------
async def _enforce_limits(self) -> None:
"""Evict least-recently-used entries until disk usage is within budget.
Repeatedly drops the entry at the front of the LRU ``OrderedDict``
(the oldest) via :meth:`_evict_entry` while the tracked
``self._total_bytes`` exceeds the configured ``self._max_bytes``.
Each eviction removes the entry from the index and deletes its
files, so this touches the filesystem. Runs under the lock held by
the caller.
Called by :meth:`put` after a new entry is added.
"""
while self._total_bytes > self._max_bytes and self._index:
# Pop the least-recently-used entry (front of OrderedDict)
url, entry = next(iter(self._index.items()))
self._evict_entry(url)
def _evict_entry(self, url: str) -> None:
"""Remove one entry from the in-memory index and from disk.
Pops the entry for *url*, subtracts its size from the running
``self._total_bytes`` total, and deletes its ``.dat``/``.json``
files via :meth:`_delete_disk`; a no-op if the URL is not indexed.
Touches the filesystem and is called while the lock is held.
Called by :meth:`get` (when a backing file has vanished) and by
:meth:`_enforce_limits` (when shrinking to the disk budget).
Args:
url (str): The cache key of the entry to remove.
"""
entry = self._index.pop(url, None)
if entry is None:
return
self._total_bytes -= entry.size
self._delete_disk(entry.disk_key)
logger.debug("Media cache: evicted %s (%s)", entry.filename, url)
def _shed_memory(self) -> None:
"""Release resident bytes for the oldest entries to honor the memory cap.
Counts entries whose ``data`` is currently held in RAM and, while
that exceeds ``self._max_memory``, sets ``data`` to ``None`` on the
oldest ones. This frees memory only — the entries stay indexed and
their bytes remain on disk, so :meth:`get` simply reloads them via
:meth:`_read_disk` on the next access. No I/O here; runs under the
lock held by the caller.
Called by :meth:`put` after writing a new entry.
"""
in_memory = [url for url, e in self._index.items() if e.data is not None]
while len(in_memory) > self._max_memory:
oldest_url = in_memory.pop(0)
entry = self._index.get(oldest_url)
if entry is not None:
entry.data = None