Source code for platforms.media_common

"""Shared media-to-content-part conversion for all platforms.

Converts raw bytes + MIME type into the multimodal content-part format
expected by the OpenRouter chat-completions API.  Platform-specific
*download* logic lives in each adapter; this module only handles the
format conversion.

Office / ODF / EPUB documents whose MIME types are not supported by
the downstream LLM are automatically converted to PDF via LibreOffice
headless before being embedded as content parts.

GIF and animated WebP images are automatically re-encoded as MP4
(H.264 baseline) so the Gemini API receives a well-supported video
format instead of GIF/animated-WebP.
"""

from __future__ import annotations

import asyncio
import base64
import logging
import os
import shutil
import tempfile
from pathlib import PurePosixPath
from typing import Any

logger = logging.getLogger(__name__)

# Maps broad MIME categories / specific types to audio format strings
# accepted by the OpenRouter ``input_audio`` content type.
_AUDIO_FORMAT_MAP: dict[str, str] = {
    "audio/wav": "wav",
    "audio/x-wav": "wav",
    "audio/wave": "wav",
    "audio/mp3": "mp3",
    "audio/mpeg": "mp3",
    "audio/ogg": "ogg",
    "audio/flac": "flac",
    "audio/x-flac": "flac",
    "audio/aac": "aac",
    "audio/mp4": "m4a",
    "audio/x-m4a": "m4a",
    "audio/m4a": "m4a",
    "audio/aiff": "aiff",
    "audio/x-aiff": "aiff",
}

# Application/* MIME types that are really text and should be sent as
# ``text/plain`` so the LLM receives a supported type.
_TEXT_REMAP_MIME_TYPES: set[str] = {
    "application/json",
    "application/xml",
    "application/javascript",
    "application/x-javascript",
    "application/typescript",
    "application/x-yaml",
    "application/yaml",
    "application/x-sh",
    "application/x-shellscript",
    "application/toml",
    "application/x-toml",
    "application/sql",
    "application/graphql",
}

# MIME types that should be converted to PDF before sending to the LLM.
_CONVERTIBLE_MIME_TYPES: set[str] = {
    # Word-processing
    "application/msword",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    "application/rtf",
    "text/rtf",
    "application/vnd.oasis.opendocument.text",
    # Spreadsheets
    "application/vnd.ms-excel",
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "application/vnd.ms-excel.sheet.macroEnabled.12",
    "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
    "application/vnd.oasis.opendocument.spreadsheet",
    # Presentations
    "application/vnd.ms-powerpoint",
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    "application/vnd.openxmlformats-officedocument.presentationml.slideshow",
    "application/vnd.ms-powerpoint.presentation.macroEnabled.12",
    "application/vnd.ms-powerpoint.slideshow.macroEnabled.12",
    "application/vnd.oasis.opendocument.presentation",
    # E-books
    "application/epub+zip",
}

# File extensions that should trigger PDF conversion (fallback when MIME
# type is generic like ``application/octet-stream``).
_CONVERTIBLE_EXTENSIONS: set[str] = {
    ".doc", ".docx", ".rtf", ".odt",
    ".xls", ".xlsx", ".xlsm", ".xlsb", ".ods",
    ".ppt", ".pptx", ".pptm", ".pps", ".ppsx", ".odp",
    ".epub",
}

# ------------------------------------------------------------------
# GIF / animated WebP -> MP4 re-encoding
# ------------------------------------------------------------------

def _is_animated_webp(data: bytes) -> bool:
    """Check if raw bytes are an *animated* WebP.

    Animated WebP files use the extended format (VP8X) with the
    animation flag set, and contain ANMF (animation frame) chunks.
    Static WebP (VP8 or VP8L) files are left alone.
    """
    # RIFF....WEBP header check
    if len(data) < 30 or data[:4] != b"RIFF" or data[8:12] != b"WEBP":
        return False
    # VP8X chunk starts at offset 12 for extended format
    if data[12:16] != b"VP8X":
        return False
    # Flags byte is at offset 20; bit 1 (0x02) = animation flag
    if data[20] & 0x02:
        return True
    # Fallback: scan for ANMF chunk headers (animation frame)
    return b"ANMF" in data[:4096]


def _webp_to_gif_bytes(data: bytes) -> bytes | None:
    """Convert animated WebP to GIF using Pillow.

    Pillow has native WebP support (including animated) and does NOT
    require ffmpeg's libwebp demuxer. This reliably extracts all
    frames and re-encodes as GIF so the proven GIF->MP4 ffmpeg
    pipeline can take over.

    Returns GIF bytes on success, or None on failure.
    """
    try:
        import io
        from PIL import Image

        img = Image.open(io.BytesIO(data))

        # Check if actually animated
        n_frames = getattr(img, "n_frames", 1)
        if n_frames <= 1:
            # Single frame -- not actually animated, bail
            return None

        # Extract all frames
        frames = []
        durations = []
        for i in range(n_frames):
            img.seek(i)
            frame = img.convert("RGBA")
            frames.append(frame)
            # Get frame duration (ms), default 100ms
            durations.append(img.info.get("duration", 100))

        if not frames:
            return None

        # Write as GIF
        buf = io.BytesIO()
        frames[0].save(
            buf,
            format="GIF",
            save_all=True,
            append_images=frames[1:],
            duration=durations,
            loop=img.info.get("loop", 0),
            disposal=2,  # restore to background
        )
        gif_data = buf.getvalue()
        logger.info(
            "Pillow WebP->GIF: %d frames, %d bytes -> %d bytes",
            n_frames, len(data), len(gif_data),
        )
        return gif_data

    except ImportError:
        logger.warning("Pillow not installed -- cannot convert animated WebP")
        return None
    except Exception:
        logger.error("Pillow WebP->GIF conversion failed", exc_info=True)
        return None


async def _convert_gif_to_mp4(
    data: bytes,
    timeout: float = 30.0,
) -> bytes | None:
    """Convert GIF bytes to MP4 (H.264 baseline, yuv420p) via ffmpeg.

    Returns the MP4 bytes on success, or ``None`` on failure.
    """
    ffmpeg_path = shutil.which("ffmpeg")
    if not ffmpeg_path:
        logger.warning("ffmpeg not found -- GIF will not be re-encoded")
        return None

    temp_dir = tempfile.mkdtemp(prefix="gif2mp4_")
    try:
        input_path = os.path.join(temp_dir, "input.gif")
        output_path = os.path.join(temp_dir, "output.mp4")

        with open(input_path, "wb") as f:
            f.write(data)

        proc = await asyncio.create_subprocess_exec(
            ffmpeg_path,
            "-y",
            "-i", input_path,
            # H.264 baseline profile -- widest possible decoder support
            "-c:v", "libx264",
            "-profile:v", "baseline",
            "-level", "3.0",
            "-pix_fmt", "yuv420p",
            # Pad to even dimensions (required by H.264)
            "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
            "-movflags", "+faststart",
            "-an",  # no audio track needed
            output_path,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE,
            stdin=asyncio.subprocess.DEVNULL,
        )
        _, stderr = await asyncio.wait_for(
            proc.communicate(), timeout=timeout,
        )
        if proc.returncode != 0:
            logger.warning(
                "ffmpeg GIF->MP4 failed (rc=%s): %s",
                proc.returncode,
                (stderr or b"").decode(errors="ignore")[:500],
            )
            return None

        if not os.path.exists(output_path):
            logger.warning("ffmpeg produced no output for GIF->MP4")
            return None

        with open(output_path, "rb") as f:
            return f.read()
    except asyncio.TimeoutError:
        logger.error("ffmpeg GIF->MP4 timed out after %.0fs", timeout)
        return None
    except Exception:
        logger.error("Unexpected error in GIF->MP4 conversion", exc_info=True)
        return None
    finally:
        try:
            shutil.rmtree(temp_dir)
        except OSError:
            pass


[docs] async def maybe_reencode_gif( data: bytes, mimetype: str, filename: str, ) -> tuple[bytes, str, str]: """Re-encode GIF or animated WebP as MP4 for the Gemini API. For animated WebP: Pillow converts WebP->GIF (no ffmpeg libwebp needed), then the GIF is converted to MP4 via ffmpeg. For GIF: directly converted to MP4 via ffmpeg. Returns ``(data, mimetype, filename)`` -- either the converted MP4 or the original inputs unchanged if conversion fails, the input is not a GIF, or the WebP is not animated (static WebP passes through as a normal image). """ ext = PurePosixPath(filename).suffix.lower() is_gif = ( mimetype == "image/gif" or ext == ".gif" ) is_anim_webp = ( (mimetype == "image/webp" or ext == ".webp") and _is_animated_webp(data) ) if not is_gif and not is_anim_webp: return data, mimetype, filename # For animated WebP: convert to GIF first via Pillow, then # feed into the proven GIF->MP4 pipeline. This avoids needing # ffmpeg compiled with libwebp (which many servers lack). gif_data = data src_label = "GIF" if is_anim_webp: src_label = "animated WebP" gif_data = _webp_to_gif_bytes(data) if gif_data is None: logger.warning( "WebP->GIF conversion failed for %s -- passing through as-is", filename, ) return data, mimetype, filename mp4_data = await _convert_gif_to_mp4(gif_data) if mp4_data is None: logger.warning( "%s re-encoding failed for %s -- passing through as-is", src_label, filename, ) return data, mimetype, filename new_filename = PurePosixPath(filename).stem + ".mp4" logger.info( "Re-encoded %s -> MP4: %s (%d bytes -> %d bytes)", src_label, filename, len(data), len(mp4_data), ) return mp4_data, "video/mp4", new_filename
[docs] async def media_to_content_parts( data: bytes, mimetype: str, filename: str, body_text: str | None = None, ) -> list[dict[str, Any]]: """Build an OpenRouter multimodal content-parts list from raw media. Office / ODF documents are transparently converted to PDF via LibreOffice so the LLM never sees an unsupported MIME type. Parameters ---------- data: The raw file bytes. mimetype: MIME type of the file (e.g. ``"image/png"``). filename: Human-readable filename. body_text: Optional caption / message body text to include alongside the media. When present it is prepended as a ``text`` content part. Returns ------- list[dict] A list of content-part dicts suitable for the ``content`` field of an OpenRouter user message. """ parts: list[dict[str, Any]] = [] if body_text: parts.append({"type": "text", "text": body_text}) # Re-encode GIF / animated WebP -> MP4 before any further # processing so the API receives a well-supported video format. data, mimetype, filename = await maybe_reencode_gif( data, mimetype, filename, ) ext = PurePosixPath(filename).suffix.lower() needs_conversion = ( mimetype in _CONVERTIBLE_MIME_TYPES or ext in _CONVERTIBLE_EXTENSIONS ) # Remap application/* types that are really text (e.g. JSON, XML, # YAML) so they are sent as ``text/plain`` instead of falling into # the unsupported-binary fallback. if mimetype in _TEXT_REMAP_MIME_TYPES: mimetype = "text/plain" if needs_conversion: pdf_data = await _convert_to_pdf(data, filename) if pdf_data is not None: b64 = base64.b64encode(pdf_data).decode("ascii") pdf_name = PurePosixPath(filename).stem + ".pdf" parts.append({ "type": "file", "file": { "filename": pdf_name, "file_data": f"data:application/pdf;base64,{b64}", }, }) else: parts.append({ "type": "text", "text": ( f"[Attachment: {filename} — conversion to PDF failed; " f"the document could not be processed]" ), }) return parts b64 = base64.b64encode(data).decode("ascii") category = mimetype.split("/")[0] if category == "image": parts.append({ "type": "image_url", "image_url": { "url": f"data:{mimetype};base64,{b64}", }, }) elif category == "audio": fallback = _guess_audio_format(filename, mimetype) fmt = _AUDIO_FORMAT_MAP.get(mimetype, fallback) parts.append({ "type": "input_audio", "input_audio": { "data": b64, "format": fmt, }, }) elif category == "video": parts.append({ "type": "video_url", "video_url": { "url": f"data:{mimetype};base64,{b64}", }, }) elif mimetype == "application/pdf": parts.append({ "type": "file", "file": { "filename": filename, "file_data": f"data:application/pdf;base64,{b64}", }, }) elif category == "text": parts.append({ "type": "file", "file": { "filename": filename, "file_data": f"data:{mimetype};base64,{b64}", }, }) else: # Unknown binary — try to decode as text; if that fails, send a # placeholder so we never forward an unsupported MIME type. try: data.decode("utf-8") parts.append({ "type": "file", "file": { "filename": filename, "file_data": f"data:text/plain;base64,{b64}", }, }) except (UnicodeDecodeError, ValueError): parts.append({ "type": "text", "text": ( f"[Attachment: {filename} (type: {mimetype}) — " f"unsupported file type, contents omitted]" ), }) return parts
# ------------------------------------------------------------------ # LibreOffice PDF conversion # ------------------------------------------------------------------ _LIBREOFFICE_BINARIES = ("libreoffice", "soffice", "lowriter") async def _convert_to_pdf( data: bytes, filename: str, timeout: float = 60.0, ) -> bytes | None: """Convert a document to PDF using LibreOffice in headless mode. Returns the PDF bytes on success, or ``None`` on failure. """ temp_dir = tempfile.mkdtemp(prefix="doc2pdf_") try: input_ext = PurePosixPath(filename).suffix or ".tmp" input_path = os.path.join(temp_dir, f"input{input_ext}") with open(input_path, "wb") as f: f.write(data) last_error: Exception | None = None for binary_name in _LIBREOFFICE_BINARIES: binary_path = shutil.which(binary_name) if not binary_path: continue try: proc = await asyncio.create_subprocess_exec( binary_path, "--headless", "--norestore", "--invisible", "--nodefault", "--nolockcheck", "--nofirststartwizard", "--convert-to", "pdf", "--outdir", temp_dir, input_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, cwd=temp_dir, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout, ) if proc.returncode == 0: break msg = stderr.decode(errors="ignore") if stderr else "" logger.warning( "%s conversion failed (rc=%s): %s", binary_name, proc.returncode, msg[:500], ) last_error = RuntimeError( f"{binary_name} exited with {proc.returncode}", ) except FileNotFoundError as exc: last_error = exc continue except asyncio.TimeoutError: logger.error( "LibreOffice conversion timed out after %.0fs for %s", timeout, filename, ) proc.kill() return None expected_pdf = os.path.join( temp_dir, PurePosixPath(input_path).stem + ".pdf", ) pdf_path = expected_pdf if os.path.exists(expected_pdf) else None if not pdf_path: for entry in os.listdir(temp_dir): if entry.lower().endswith(".pdf"): pdf_path = os.path.join(temp_dir, entry) break if not pdf_path or not os.path.exists(pdf_path): logger.error( "PDF conversion produced no output for %s: %s", filename, last_error, ) return None with open(pdf_path, "rb") as f: return f.read() except Exception: logger.error( "Unexpected error converting %s to PDF", filename, exc_info=True, ) return None finally: try: shutil.rmtree(temp_dir) except OSError: pass # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _guess_audio_format(filename: str, mimetype: str) -> str: """Best-effort guess of the audio format string from filename / MIME.""" ext = PurePosixPath(filename).suffix.lstrip(".").lower() known = ( "wav", "mp3", "ogg", "flac", "aac", "m4a", "aiff", "pcm16", "pcm24", ) if ext in known: return ext # Last resort: use the MIME subtype return mimetype.split("/")[-1].lower()