Source code for media_cache
"""Disk-backed LRU media cache.
Caches downloaded media (images, audio, video, files) so that the same
URL is never fetched twice. An in-memory index provides fast lookups
while a configurable disk directory persists data across restarts.
Each cached entry is stored on disk as two files::
{sha256_of_url}.dat – raw media bytes
{sha256_of_url}.json – sidecar metadata (mimetype, filename, url, ts, size)
On startup the disk directory is scanned to rebuild the in-memory index
*without* loading all bytes into RAM.
"""
from __future__ import annotations
import asyncio
import hashlib
import json
import logging
import time
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Awaitable, Callable
import aiofiles
logger = logging.getLogger(__name__)
# Type alias for an async callable that returns (bytes, mimetype, filename)
Downloader = Callable[[], Awaitable[tuple[bytes, str, str]]]
# ------------------------------------------------------------------
# Internal data types
# ------------------------------------------------------------------
@dataclass
class _CacheEntry:
"""Lightweight index record – bytes are only loaded on demand."""
url: str
mimetype: str
filename: str
size: int # byte length of the media data
disk_key: str # sha256 hex used for filenames on disk
last_access: float # epoch seconds
# When populated, holds the raw media bytes (soft in-memory cache).
# May be ``None`` if the entry was loaded from a disk-only scan.
data: bytes | None = None
# ------------------------------------------------------------------
# MediaCache
# ------------------------------------------------------------------
[docs]
class MediaCache:
"""Two-tier (memory + disk) LRU media cache.
Parameters
----------
cache_dir:
Directory for persistent storage. Created automatically.
max_size_mb:
Approximate cap on total disk usage in megabytes. Oldest
entries are evicted when the limit is exceeded.
max_memory_items:
Maximum number of entries whose *bytes* are kept in RAM.
Entries beyond this limit are still indexed (metadata only)
and will be read back from disk on the next access.
"""
[docs]
def __init__(
self,
cache_dir: str | Path = "media_cache",
max_size_mb: int = 500,
max_memory_items: int = 64,
) -> None:
"""Initialize the instance.
Args:
cache_dir (str | Path): The cache dir value.
max_size_mb (int): The max size mb value.
max_memory_items (int): The max memory items value.
"""
self._dir = Path(cache_dir)
self._max_bytes = max_size_mb * 1024 * 1024
self._max_memory = max_memory_items
# Ordered by last access (most-recent at the end).
self._index: OrderedDict[str, _CacheEntry] = OrderedDict()
self._total_bytes: int = 0
self._lock = asyncio.Lock()
# Stats counters
self._hits: int = 0
self._misses: int = 0
# Disk scan deferred to ensure_loaded() so startup is non-blocking
self._scan_loaded: bool = False
# Ensure the cache directory exists
self._dir.mkdir(parents=True, exist_ok=True)
[docs]
async def ensure_loaded(self) -> None:
"""Load the in-memory index from disk (non-blocking).
Called during async startup so the sync disk scan does not block
the event loop. Idempotent — safe to call multiple times.
"""
if self._scan_loaded:
return
await asyncio.to_thread(self._scan_disk)
self._scan_loaded = True
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
async def get(self, url: str) -> tuple[bytes, str, str] | None:
"""Return ``(data, mimetype, filename)`` if *url* is cached, else ``None``."""
async with self._lock:
entry = self._index.get(url)
if entry is None:
return None
# Promote to most-recently-used
self._index.move_to_end(url)
entry.last_access = time.time()
# If bytes aren't in memory, reload from disk
if entry.data is None:
entry.data = await self._read_disk(entry.disk_key)
if entry.data is None:
# File vanished – remove stale index entry
self._evict_entry(url)
return None
self._hits += 1
logger.info(
"Media cache HIT: %s (%s, %d bytes)",
entry.filename, entry.mimetype, entry.size,
)
return entry.data, entry.mimetype, entry.filename
[docs]
async def put(
self,
url: str,
data: bytes,
mimetype: str,
filename: str,
) -> None:
"""Store media bytes under *url*, writing through to disk."""
async with self._lock:
# If already cached, just update access time
if url in self._index:
self._index.move_to_end(url)
self._index[url].last_access = time.time()
return
disk_key = self._url_to_key(url)
entry = _CacheEntry(
url=url,
mimetype=mimetype,
filename=filename,
size=len(data),
disk_key=disk_key,
last_access=time.time(),
data=data,
)
# Write to disk
await self._write_disk(entry, data)
self._index[url] = entry
self._total_bytes += entry.size
# Evict oldest entries if over disk budget
await self._enforce_limits()
# Shed in-memory bytes for old entries
self._shed_memory()
[docs]
async def get_or_download(
self,
url: str,
downloader: Downloader,
) -> tuple[bytes, str, str]:
"""Return cached media or call *downloader* and cache the result.
*downloader* is an ``async`` callable returning
``(data, mimetype, filename)``.
GIF images are automatically re-encoded as MP4 before being
stored so that the cache always contains the video format.
"""
cached = await self.get(url)
if cached is not None:
return cached
# Cache miss – perform the download (outside the lock)
self._misses += 1
data, mimetype, filename = await downloader()
# Re-encode GIF → MP4 before caching so both the stored entry
# and the returned data use a well-supported video format.
from platforms.media_common import maybe_reencode_gif
data, mimetype, filename = await maybe_reencode_gif(
data, mimetype, filename,
)
logger.info(
"Media cache MISS: downloading %s (%s, %d bytes)",
filename, mimetype, len(data),
)
await self.put(url, data, mimetype, filename)
return data, mimetype, filename
[docs]
def stats(self) -> dict[str, Any]:
"""Return a snapshot of cache statistics."""
entries_in_memory = sum(
1 for e in self._index.values() if e.data is not None
)
return {
"entries": len(self._index),
"entries_in_memory": entries_in_memory,
"total_bytes": self._total_bytes,
"total_mb": round(self._total_bytes / (1024 * 1024), 2),
"max_mb": round(self._max_bytes / (1024 * 1024), 2),
"hits": self._hits,
"misses": self._misses,
"hit_rate": (
round(self._hits / (self._hits + self._misses), 3)
if (self._hits + self._misses) > 0
else 0.0
),
}
# ------------------------------------------------------------------
# Disk I/O helpers
# ------------------------------------------------------------------
def _url_to_key(self, url: str) -> str:
"""Internal helper: url to key.
Args:
url (str): URL string.
Returns:
str: Result string.
"""
return hashlib.sha256(url.encode()).hexdigest()
def _dat_path(self, disk_key: str) -> Path:
"""Internal helper: dat path.
Args:
disk_key (str): The disk key value.
Returns:
Path: The result.
"""
return self._dir / f"{disk_key}.dat"
def _meta_path(self, disk_key: str) -> Path:
"""Internal helper: meta path.
Args:
disk_key (str): The disk key value.
Returns:
Path: The result.
"""
return self._dir / f"{disk_key}.json"
async def _write_disk(self, entry: _CacheEntry, data: bytes) -> None:
"""Write data + metadata sidecar to the cache directory."""
dat_path = self._dat_path(entry.disk_key)
meta_path = self._meta_path(entry.disk_key)
meta = {
"url": entry.url,
"mimetype": entry.mimetype,
"filename": entry.filename,
"size": entry.size,
"ts": entry.last_access,
}
async with aiofiles.open(dat_path, "wb") as f:
await f.write(data)
async with aiofiles.open(meta_path, "w", encoding="utf-8") as f:
await f.write(json.dumps(meta, indent=2))
async def _read_disk(self, disk_key: str) -> bytes | None:
"""Read cached bytes from disk, or ``None`` if missing."""
dat_path = self._dat_path(disk_key)
if not dat_path.exists():
return None
try:
async with aiofiles.open(dat_path, "rb") as f:
return await f.read()
except OSError:
logger.warning("Failed to read cached file %s", dat_path)
return None
def _delete_disk(self, disk_key: str) -> None:
"""Remove data + metadata files from disk."""
for path in (self._dat_path(disk_key), self._meta_path(disk_key)):
try:
path.unlink(missing_ok=True)
except OSError:
logger.warning("Could not delete cache file %s", path)
# ------------------------------------------------------------------
# Startup disk scan
# ------------------------------------------------------------------
def _scan_disk(self) -> None:
"""Populate the in-memory index from existing disk cache files.
Only metadata is loaded – actual bytes stay on disk until
requested.
"""
count = 0
for meta_path in sorted(self._dir.glob("*.json")):
try:
raw = meta_path.read_text(encoding="utf-8")
meta = json.loads(raw)
except (OSError, json.JSONDecodeError):
logger.warning("Skipping unreadable cache meta %s", meta_path)
continue
disk_key = meta_path.stem # filename without .json
dat_path = self._dat_path(disk_key)
if not dat_path.exists():
# Orphaned metadata – clean up
meta_path.unlink(missing_ok=True)
continue
url = meta["url"]
entry = _CacheEntry(
url=url,
mimetype=meta.get("mimetype", "application/octet-stream"),
filename=meta.get("filename", "attachment"),
size=meta.get("size", 0) or dat_path.stat().st_size,
disk_key=disk_key,
last_access=meta.get("ts", 0.0),
data=None, # don't load bytes on startup
)
self._index[url] = entry
self._total_bytes += entry.size
count += 1
# Sort by last_access so the OrderedDict is in LRU order
if self._index:
sorted_items = sorted(
self._index.items(), key=lambda kv: kv[1].last_access,
)
self._index.clear()
for k, v in sorted_items:
self._index[k] = v
if count:
logger.info(
"Media cache: loaded %d entries from disk (%.1f MB)",
count,
self._total_bytes / (1024 * 1024),
)
# ------------------------------------------------------------------
# Eviction
# ------------------------------------------------------------------
async def _enforce_limits(self) -> None:
"""Evict oldest entries until total disk size is within budget."""
while self._total_bytes > self._max_bytes and self._index:
# Pop the least-recently-used entry (front of OrderedDict)
url, entry = next(iter(self._index.items()))
self._evict_entry(url)
def _evict_entry(self, url: str) -> None:
"""Remove a single entry from both the index and disk."""
entry = self._index.pop(url, None)
if entry is None:
return
self._total_bytes -= entry.size
self._delete_disk(entry.disk_key)
logger.debug("Media cache: evicted %s (%s)", entry.filename, url)
def _shed_memory(self) -> None:
"""Drop in-memory bytes for the oldest entries to stay within
``max_memory_items``."""
in_memory = [
url for url, e in self._index.items() if e.data is not None
]
while len(in_memory) > self._max_memory:
oldest_url = in_memory.pop(0)
entry = self._index.get(oldest_url)
if entry is not None:
entry.data = None