"""Shared media-to-content-part conversion for all platforms.
Converts raw bytes + MIME type into the multimodal content-part format
expected by the OpenRouter chat-completions API. Platform-specific
*download* logic lives in each adapter; this module only handles the
format conversion.
Office / ODF / EPUB documents whose MIME types are not supported by
the downstream LLM are automatically converted to PDF via LibreOffice
headless before being embedded as content parts.
GIF and animated WebP images are automatically re-encoded as MP4
(H.264 baseline) so the Gemini API receives a well-supported video
format instead of GIF/animated-WebP.
"""
from __future__ import annotations
import asyncio
import base64
import logging
import os
import shutil
import tempfile
from pathlib import PurePosixPath
from typing import Any
logger = logging.getLogger(__name__)
# Maps broad MIME categories / specific types to audio format strings
# accepted by the OpenRouter ``input_audio`` content type.
_AUDIO_FORMAT_MAP: dict[str, str] = {
"audio/wav": "wav",
"audio/x-wav": "wav",
"audio/wave": "wav",
"audio/mp3": "mp3",
"audio/mpeg": "mp3",
"audio/ogg": "ogg",
"audio/flac": "flac",
"audio/x-flac": "flac",
"audio/aac": "aac",
"audio/mp4": "m4a",
"audio/x-m4a": "m4a",
"audio/m4a": "m4a",
"audio/aiff": "aiff",
"audio/x-aiff": "aiff",
}
# Application/* MIME types that are really text and should be sent as
# ``text/plain`` so the LLM receives a supported type.
_TEXT_REMAP_MIME_TYPES: set[str] = {
"application/json",
"application/xml",
"application/javascript",
"application/x-javascript",
"application/typescript",
"application/x-yaml",
"application/yaml",
"application/x-sh",
"application/x-shellscript",
"application/toml",
"application/x-toml",
"application/sql",
"application/graphql",
}
# MIME types that should be converted to PDF before sending to the LLM.
_CONVERTIBLE_MIME_TYPES: set[str] = {
# Word-processing
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/rtf",
"text/rtf",
"application/vnd.oasis.opendocument.text",
# Spreadsheets
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel.sheet.macroEnabled.12",
"application/vnd.ms-excel.sheet.binary.macroEnabled.12",
"application/vnd.oasis.opendocument.spreadsheet",
# Presentations
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.presentationml.slideshow",
"application/vnd.ms-powerpoint.presentation.macroEnabled.12",
"application/vnd.ms-powerpoint.slideshow.macroEnabled.12",
"application/vnd.oasis.opendocument.presentation",
# E-books
"application/epub+zip",
}
# File extensions that should trigger PDF conversion (fallback when MIME
# type is generic like ``application/octet-stream``).
_CONVERTIBLE_EXTENSIONS: set[str] = {
".doc", ".docx", ".rtf", ".odt",
".xls", ".xlsx", ".xlsm", ".xlsb", ".ods",
".ppt", ".pptx", ".pptm", ".pps", ".ppsx", ".odp",
".epub",
}
# ------------------------------------------------------------------
# GIF / animated WebP -> MP4 re-encoding
# ------------------------------------------------------------------
def _is_animated_webp(data: bytes) -> bool:
"""Check if raw bytes are an *animated* WebP.
Animated WebP files use the extended format (VP8X) with the
animation flag set, and contain ANMF (animation frame) chunks.
Static WebP (VP8 or VP8L) files are left alone.
"""
# RIFF....WEBP header check
if len(data) < 30 or data[:4] != b"RIFF" or data[8:12] != b"WEBP":
return False
# VP8X chunk starts at offset 12 for extended format
if data[12:16] != b"VP8X":
return False
# Flags byte is at offset 20; bit 1 (0x02) = animation flag
if data[20] & 0x02:
return True
# Fallback: scan for ANMF chunk headers (animation frame)
return b"ANMF" in data[:4096]
def _webp_to_gif_bytes(data: bytes) -> bytes | None:
"""Convert animated WebP to GIF using Pillow.
Pillow has native WebP support (including animated) and does NOT
require ffmpeg's libwebp demuxer. This reliably extracts all
frames and re-encodes as GIF so the proven GIF->MP4 ffmpeg
pipeline can take over.
Returns GIF bytes on success, or None on failure.
"""
try:
import io
from PIL import Image
img = Image.open(io.BytesIO(data))
# Check if actually animated
n_frames = getattr(img, "n_frames", 1)
if n_frames <= 1:
# Single frame -- not actually animated, bail
return None
# Extract all frames
frames = []
durations = []
for i in range(n_frames):
img.seek(i)
frame = img.convert("RGBA")
frames.append(frame)
# Get frame duration (ms), default 100ms
durations.append(img.info.get("duration", 100))
if not frames:
return None
# Write as GIF
buf = io.BytesIO()
frames[0].save(
buf,
format="GIF",
save_all=True,
append_images=frames[1:],
duration=durations,
loop=img.info.get("loop", 0),
disposal=2, # restore to background
)
gif_data = buf.getvalue()
logger.info(
"Pillow WebP->GIF: %d frames, %d bytes -> %d bytes",
n_frames, len(data), len(gif_data),
)
return gif_data
except ImportError:
logger.warning("Pillow not installed -- cannot convert animated WebP")
return None
except Exception:
logger.error("Pillow WebP->GIF conversion failed", exc_info=True)
return None
async def _convert_gif_to_mp4(
data: bytes,
timeout: float = 30.0,
) -> bytes | None:
"""Convert GIF bytes to MP4 (H.264 baseline, yuv420p) via ffmpeg.
Returns the MP4 bytes on success, or ``None`` on failure.
"""
ffmpeg_path = shutil.which("ffmpeg")
if not ffmpeg_path:
logger.warning("ffmpeg not found -- GIF will not be re-encoded")
return None
temp_dir = tempfile.mkdtemp(prefix="gif2mp4_")
try:
input_path = os.path.join(temp_dir, "input.gif")
output_path = os.path.join(temp_dir, "output.mp4")
with open(input_path, "wb") as f:
f.write(data)
proc = await asyncio.create_subprocess_exec(
ffmpeg_path,
"-y",
"-i", input_path,
# H.264 baseline profile -- widest possible decoder support
"-c:v", "libx264",
"-profile:v", "baseline",
"-level", "3.0",
"-pix_fmt", "yuv420p",
# Pad to even dimensions (required by H.264)
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",
"-movflags", "+faststart",
"-an", # no audio track needed
output_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
)
_, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout,
)
if proc.returncode != 0:
logger.warning(
"ffmpeg GIF->MP4 failed (rc=%s): %s",
proc.returncode,
(stderr or b"").decode(errors="ignore")[:500],
)
return None
if not os.path.exists(output_path):
logger.warning("ffmpeg produced no output for GIF->MP4")
return None
with open(output_path, "rb") as f:
return f.read()
except asyncio.TimeoutError:
logger.error("ffmpeg GIF->MP4 timed out after %.0fs", timeout)
return None
except Exception:
logger.error("Unexpected error in GIF->MP4 conversion", exc_info=True)
return None
finally:
try:
shutil.rmtree(temp_dir)
except OSError:
pass
[docs]
async def maybe_reencode_gif(
data: bytes,
mimetype: str,
filename: str,
) -> tuple[bytes, str, str]:
"""Re-encode GIF or animated WebP as MP4 for the Gemini API.
For animated WebP: Pillow converts WebP->GIF (no ffmpeg libwebp
needed), then the GIF is converted to MP4 via ffmpeg.
For GIF: directly converted to MP4 via ffmpeg.
Returns ``(data, mimetype, filename)`` -- either the converted MP4
or the original inputs unchanged if conversion fails, the input is
not a GIF, or the WebP is not animated (static WebP passes through
as a normal image).
"""
ext = PurePosixPath(filename).suffix.lower()
is_gif = (
mimetype == "image/gif"
or ext == ".gif"
)
is_anim_webp = (
(mimetype == "image/webp" or ext == ".webp")
and _is_animated_webp(data)
)
if not is_gif and not is_anim_webp:
return data, mimetype, filename
# For animated WebP: convert to GIF first via Pillow, then
# feed into the proven GIF->MP4 pipeline. This avoids needing
# ffmpeg compiled with libwebp (which many servers lack).
gif_data = data
src_label = "GIF"
if is_anim_webp:
src_label = "animated WebP"
gif_data = _webp_to_gif_bytes(data)
if gif_data is None:
logger.warning(
"WebP->GIF conversion failed for %s -- passing through as-is",
filename,
)
return data, mimetype, filename
mp4_data = await _convert_gif_to_mp4(gif_data)
if mp4_data is None:
logger.warning(
"%s re-encoding failed for %s -- passing through as-is",
src_label, filename,
)
return data, mimetype, filename
new_filename = PurePosixPath(filename).stem + ".mp4"
logger.info(
"Re-encoded %s -> MP4: %s (%d bytes -> %d bytes)",
src_label, filename, len(data), len(mp4_data),
)
return mp4_data, "video/mp4", new_filename
[docs]
async def media_to_content_parts(
data: bytes,
mimetype: str,
filename: str,
body_text: str | None = None,
) -> list[dict[str, Any]]:
"""Build an OpenRouter multimodal content-parts list from raw media.
Office / ODF documents are transparently converted to PDF via
LibreOffice so the LLM never sees an unsupported MIME type.
Parameters
----------
data:
The raw file bytes.
mimetype:
MIME type of the file (e.g. ``"image/png"``).
filename:
Human-readable filename.
body_text:
Optional caption / message body text to include alongside the
media. When present it is prepended as a ``text`` content part.
Returns
-------
list[dict]
A list of content-part dicts suitable for the ``content`` field
of an OpenRouter user message.
"""
parts: list[dict[str, Any]] = []
if body_text:
parts.append({"type": "text", "text": body_text})
# Re-encode GIF / animated WebP -> MP4 before any further
# processing so the API receives a well-supported video format.
data, mimetype, filename = await maybe_reencode_gif(
data, mimetype, filename,
)
ext = PurePosixPath(filename).suffix.lower()
needs_conversion = (
mimetype in _CONVERTIBLE_MIME_TYPES
or ext in _CONVERTIBLE_EXTENSIONS
)
# Remap application/* types that are really text (e.g. JSON, XML,
# YAML) so they are sent as ``text/plain`` instead of falling into
# the unsupported-binary fallback.
if mimetype in _TEXT_REMAP_MIME_TYPES:
mimetype = "text/plain"
if needs_conversion:
pdf_data = await _convert_to_pdf(data, filename)
if pdf_data is not None:
b64 = base64.b64encode(pdf_data).decode("ascii")
pdf_name = PurePosixPath(filename).stem + ".pdf"
parts.append({
"type": "file",
"file": {
"filename": pdf_name,
"file_data": f"data:application/pdf;base64,{b64}",
},
})
else:
parts.append({
"type": "text",
"text": (
f"[Attachment: {filename} — conversion to PDF failed; "
f"the document could not be processed]"
),
})
return parts
b64 = base64.b64encode(data).decode("ascii")
category = mimetype.split("/")[0]
if category == "image":
parts.append({
"type": "image_url",
"image_url": {
"url": f"data:{mimetype};base64,{b64}",
},
})
elif category == "audio":
fallback = _guess_audio_format(filename, mimetype)
fmt = _AUDIO_FORMAT_MAP.get(mimetype, fallback)
parts.append({
"type": "input_audio",
"input_audio": {
"data": b64,
"format": fmt,
},
})
elif category == "video":
parts.append({
"type": "video_url",
"video_url": {
"url": f"data:{mimetype};base64,{b64}",
},
})
elif mimetype == "application/pdf":
parts.append({
"type": "file",
"file": {
"filename": filename,
"file_data": f"data:application/pdf;base64,{b64}",
},
})
elif category == "text":
parts.append({
"type": "file",
"file": {
"filename": filename,
"file_data": f"data:{mimetype};base64,{b64}",
},
})
else:
# Unknown binary — try to decode as text; if that fails, send a
# placeholder so we never forward an unsupported MIME type.
try:
data.decode("utf-8")
parts.append({
"type": "file",
"file": {
"filename": filename,
"file_data": f"data:text/plain;base64,{b64}",
},
})
except (UnicodeDecodeError, ValueError):
parts.append({
"type": "text",
"text": (
f"[Attachment: {filename} (type: {mimetype}) — "
f"unsupported file type, contents omitted]"
),
})
return parts
# ------------------------------------------------------------------
# LibreOffice PDF conversion
# ------------------------------------------------------------------
_LIBREOFFICE_BINARIES = ("libreoffice", "soffice", "lowriter")
async def _convert_to_pdf(
data: bytes,
filename: str,
timeout: float = 60.0,
) -> bytes | None:
"""Convert a document to PDF using LibreOffice in headless mode.
Returns the PDF bytes on success, or ``None`` on failure.
"""
temp_dir = tempfile.mkdtemp(prefix="doc2pdf_")
try:
input_ext = PurePosixPath(filename).suffix or ".tmp"
input_path = os.path.join(temp_dir, f"input{input_ext}")
with open(input_path, "wb") as f:
f.write(data)
last_error: Exception | None = None
for binary_name in _LIBREOFFICE_BINARIES:
binary_path = shutil.which(binary_name)
if not binary_path:
continue
try:
proc = await asyncio.create_subprocess_exec(
binary_path,
"--headless",
"--norestore",
"--invisible",
"--nodefault",
"--nolockcheck",
"--nofirststartwizard",
"--convert-to", "pdf",
"--outdir", temp_dir,
input_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=temp_dir,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(), timeout=timeout,
)
if proc.returncode == 0:
break
msg = stderr.decode(errors="ignore") if stderr else ""
logger.warning(
"%s conversion failed (rc=%s): %s",
binary_name, proc.returncode, msg[:500],
)
last_error = RuntimeError(
f"{binary_name} exited with {proc.returncode}",
)
except FileNotFoundError as exc:
last_error = exc
continue
except asyncio.TimeoutError:
logger.error(
"LibreOffice conversion timed out after %.0fs for %s",
timeout, filename,
)
proc.kill()
return None
expected_pdf = os.path.join(
temp_dir,
PurePosixPath(input_path).stem + ".pdf",
)
pdf_path = expected_pdf if os.path.exists(expected_pdf) else None
if not pdf_path:
for entry in os.listdir(temp_dir):
if entry.lower().endswith(".pdf"):
pdf_path = os.path.join(temp_dir, entry)
break
if not pdf_path or not os.path.exists(pdf_path):
logger.error(
"PDF conversion produced no output for %s: %s",
filename, last_error,
)
return None
with open(pdf_path, "rb") as f:
return f.read()
except Exception:
logger.error(
"Unexpected error converting %s to PDF", filename, exc_info=True,
)
return None
finally:
try:
shutil.rmtree(temp_dir)
except OSError:
pass
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _guess_audio_format(filename: str, mimetype: str) -> str:
"""Best-effort guess of the audio format string from filename / MIME."""
ext = PurePosixPath(filename).suffix.lstrip(".").lower()
known = (
"wav", "mp3", "ogg", "flac", "aac",
"m4a", "aiff", "pcm16", "pcm24",
)
if ext in known:
return ext
# Last resort: use the MIME subtype
return mimetype.split("/")[-1].lower()