"""Shared media-to-content-part conversion for all platforms.
Converts raw bytes + MIME type into the multimodal content-part format
expected by the OpenRouter chat-completions API. Platform-specific
*download* logic lives in each adapter; this module only handles the
format conversion.
Office / ODF / EPUB documents whose MIME types are not supported by
the downstream LLM are automatically converted to PDF via LibreOffice
headless before being embedded as content parts.
GIF and animated WebP images are automatically re-encoded as MP4
(H.264 baseline) so the Gemini API receives a well-supported video
format instead of GIF/animated-WebP.
"""
from __future__ import annotations
import asyncio
import base64
import io
import logging
import os
import random
import shutil
import tempfile
from pathlib import PurePosixPath
from typing import Any
logger = logging.getLogger(__name__)
# Maps broad MIME categories / specific types to audio format strings
# accepted by the OpenRouter ``input_audio`` content type.
_AUDIO_FORMAT_MAP: dict[str, str] = {
"audio/wav": "wav",
"audio/x-wav": "wav",
"audio/wave": "wav",
"audio/mp3": "mp3",
"audio/mpeg": "mp3",
"audio/ogg": "ogg",
"audio/flac": "flac",
"audio/x-flac": "flac",
"audio/aac": "aac",
"audio/mp4": "m4a",
"audio/x-m4a": "m4a",
"audio/m4a": "m4a",
"audio/aiff": "aiff",
"audio/x-aiff": "aiff",
}
# Application/* MIME types that are really text and should be sent as
# ``text/plain`` so the LLM receives a supported type.
_TEXT_REMAP_MIME_TYPES: set[str] = {
"application/json",
"application/xml",
"application/javascript",
"application/x-javascript",
"application/typescript",
"application/x-yaml",
"application/yaml",
"application/x-sh",
"application/x-shellscript",
"application/toml",
"application/x-toml",
"application/sql",
"application/graphql",
}
# MIME types that should be converted to PDF before sending to the LLM.
_CONVERTIBLE_MIME_TYPES: set[str] = {
# Word-processing
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/rtf",
"text/rtf",
"application/vnd.oasis.opendocument.text",
# Spreadsheets
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel.sheet.macroEnabled.12",
"application/vnd.ms-excel.sheet.binary.macroEnabled.12",
"application/vnd.oasis.opendocument.spreadsheet",
# Presentations
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.openxmlformats-officedocument.presentationml.slideshow",
"application/vnd.ms-powerpoint.presentation.macroEnabled.12",
"application/vnd.ms-powerpoint.slideshow.macroEnabled.12",
"application/vnd.oasis.opendocument.presentation",
# E-books
"application/epub+zip",
}
# File extensions that should trigger PDF conversion (fallback when MIME
# type is generic like ``application/octet-stream``).
_CONVERTIBLE_EXTENSIONS: set[str] = {
".doc",
".docx",
".rtf",
".odt",
".xls",
".xlsx",
".xlsm",
".xlsb",
".ods",
".ppt",
".pptx",
".pptm",
".pps",
".ppsx",
".odp",
".epub",
}
# ------------------------------------------------------------------
# CDN download retry
# ------------------------------------------------------------------
#: HTTP status codes that mean "don't bother retrying" — the resource is
#: genuinely gone or forbidden, so hammering the CDN only wastes time.
_PERMANENT_DOWNLOAD_STATUSES: frozenset[int] = frozenset({400, 401, 403, 404, 410})
#: Default backoff knobs for :func:`download_with_retry` (the attempt count is
#: configurable per-call; these shape the delay between attempts).
_RETRY_BASE_DELAY_SECONDS: float = 0.5
_RETRY_MAX_DELAY_SECONDS: float = 8.0
def _is_permanent_download_error(exc: BaseException) -> bool:
"""Return ``True`` when *exc* represents a non-retryable download failure.
A transient CDN blip (timeout, connection reset, 5xx, 429) should be
retried; a genuine 404/403/410 should not. Different platform libraries
raise different exception types (``discord.HTTPException`` carries
``.status``; ``aiohttp.ClientResponseError`` carries ``.status``; the
Matrix helper wraps ``DownloadError`` in a bare ``RuntimeError`` with no
status), so we classify by inspecting an optional status attribute rather
than coupling this module to every platform's exception hierarchy. When no
status is present we treat the error as transient (retryable).
"""
status = getattr(exc, "status", None)
if status is None:
status = getattr(exc, "status_code", None)
try:
return int(status) in _PERMANENT_DOWNLOAD_STATUSES
except (TypeError, ValueError):
return False
[docs]
async def download_with_retry(
downloader: Any,
*,
attempts: int = 3,
base_delay: float = _RETRY_BASE_DELAY_SECONDS,
max_delay: float = _RETRY_MAX_DELAY_SECONDS,
label: str = "",
) -> tuple[bytes, str, str]:
"""Call *downloader* with bounded retry on transient CDN failures.
*downloader* is the same ``async`` callable the platform adapters already
pass to :meth:`MediaCache.get_or_download` — it returns
``(data, mimetype, filename)``. A single transient Discord/Matrix CDN blip
otherwise drops the attachment for that one message (the cache fix only
stops the failure from becoming *permanent*); a few retries close that
one-shot loss.
Retries on:
* any raised exception that is not a permanent HTTP status
(see :func:`_is_permanent_download_error`), and
* an empty-bytes result (``not data``) — treated as a failed download,
consistent with ``MediaCache.get_or_download`` which refuses to cache it.
Permanent errors (404/403/410/…) re-raise immediately. After the final
attempt the last result is returned as-is (possibly empty) so the existing
empty-media handling downstream stays in control: ``get_or_download`` will
not cache it and ``media_to_content_parts`` emits a text note instead of a
blank image.
Backoff is exponential (``base_delay * 2**n``) capped at *max_delay*, with
uniform jitter to avoid thundering-herd re-fetches.
"""
attempts = max(1, int(attempts))
# Safe fallback if every attempt comes back empty; the empty-result branch
# below overwrites it with the real (mimetype, filename) when available.
last_result: tuple[bytes, str, str] = (b"", "application/octet-stream", label or "file")
for attempt in range(1, attempts + 1):
try:
data, mimetype, filename = await downloader()
if data:
return data, mimetype, filename
# Empty bytes == failed download. Keep the tuple so we can return
# it verbatim if every attempt comes back empty.
last_result = (data, mimetype, filename)
reason = "empty result"
except Exception as exc: # noqa: BLE001 — classify, then retry or re-raise
if _is_permanent_download_error(exc):
raise
if attempt == attempts:
raise
reason = repr(exc)
if attempt == attempts:
logger.warning(
"Media download '%s' still failing after %d attempt(s) (%s)",
label or "<unnamed>",
attempts,
reason,
)
break
delay = min(max_delay, base_delay * (2 ** (attempt - 1)))
delay += random.uniform(0.0, delay * 0.25)
logger.warning(
"Media download '%s' attempt %d/%d failed (%s); retrying in %.2fs",
label or "<unnamed>",
attempt,
attempts,
reason,
delay,
)
await asyncio.sleep(delay)
# Exhausted: return the last (empty) result so downstream empty-handling
# runs (``get_or_download`` won't cache it; ``media_to_content_parts`` emits
# a text note). Reachable only via the empty-result path — an all-raising
# run re-raises above.
return last_result
# ------------------------------------------------------------------
# GIF / animated WebP -> MP4 re-encoding
# ------------------------------------------------------------------
def _is_animated_webp(data: bytes) -> bool:
"""Check if raw bytes are an *animated* WebP.
Animated WebP files use the extended format (VP8X) with the
animation flag set, and contain ANMF (animation frame) chunks.
Static WebP (VP8 or VP8L) files are left alone.
"""
# RIFF....WEBP header check
if len(data) < 30 or data[:4] != b"RIFF" or data[8:12] != b"WEBP":
return False
# VP8X chunk starts at offset 12 for extended format
if data[12:16] != b"VP8X":
return False
# Flags byte is at offset 20; bit 1 (0x02) = animation flag
if data[20] & 0x02:
return True
# Fallback: scan for ANMF chunk headers (animation frame)
return b"ANMF" in data[:4096]
def _webp_to_gif_bytes(data: bytes) -> bytes | None:
"""Convert animated WebP to GIF using Pillow.
Pillow has native WebP support (including animated) and does NOT
require ffmpeg's libwebp demuxer. This reliably extracts all
frames and re-encodes as GIF so the proven GIF->MP4 ffmpeg
pipeline can take over.
Returns GIF bytes on success, or None on failure.
"""
try:
import io
from PIL import Image
img = Image.open(io.BytesIO(data))
# Check if actually animated
n_frames = getattr(img, "n_frames", 1)
if n_frames <= 1:
# Single frame -- not actually animated, bail
return None
# Extract all frames
frames = []
durations = []
for i in range(n_frames):
img.seek(i)
frame = img.convert("RGBA")
frames.append(frame)
# Get frame duration (ms), default 100ms
durations.append(img.info.get("duration", 100))
if not frames:
return None
# Write as GIF
buf = io.BytesIO()
frames[0].save(
buf,
format="GIF",
save_all=True,
append_images=frames[1:],
duration=durations,
loop=img.info.get("loop", 0),
disposal=2, # restore to background
)
gif_data = buf.getvalue()
logger.info(
"Pillow WebP->GIF: %d frames, %d bytes -> %d bytes",
n_frames,
len(data),
len(gif_data),
)
return gif_data
except ImportError:
logger.warning("Pillow not installed -- cannot convert animated WebP")
return None
except Exception:
logger.error("Pillow WebP->GIF conversion failed", exc_info=True)
return None
async def _convert_gif_to_mp4(
data: bytes,
timeout: float = 30.0,
) -> bytes | None:
"""Convert GIF bytes to MP4 (H.264 baseline, yuv420p) via ffmpeg.
Returns the MP4 bytes on success, or ``None`` on failure.
"""
ffmpeg_path = shutil.which("ffmpeg")
if not ffmpeg_path:
logger.warning("ffmpeg not found -- GIF will not be re-encoded")
return None
temp_dir = tempfile.mkdtemp(prefix="gif2mp4_")
try:
input_path = os.path.join(temp_dir, "input.gif")
output_path = os.path.join(temp_dir, "output.mp4")
with open(input_path, "wb") as f:
f.write(data)
proc = await asyncio.create_subprocess_exec(
ffmpeg_path,
"-y",
"-i",
input_path,
# H.264 baseline profile -- widest possible decoder support
"-c:v",
"libx264",
"-profile:v",
"baseline",
"-level",
"3.0",
"-pix_fmt",
"yuv420p",
# Pad to even dimensions (required by H.264)
"-vf",
"pad=ceil(iw/2)*2:ceil(ih/2)*2",
"-movflags",
"+faststart",
"-an", # no audio track needed
output_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
)
_, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
if proc.returncode != 0:
logger.warning(
"ffmpeg GIF->MP4 failed (rc=%s): %s",
proc.returncode,
(stderr or b"").decode(errors="ignore")[:500],
)
return None
if not os.path.exists(output_path):
logger.warning("ffmpeg produced no output for GIF->MP4")
return None
with open(output_path, "rb") as f:
return f.read()
except asyncio.TimeoutError:
logger.error("ffmpeg GIF->MP4 timed out after %.0fs", timeout)
return None
except Exception:
logger.error("Unexpected error in GIF->MP4 conversion", exc_info=True)
return None
finally:
try:
shutil.rmtree(temp_dir)
except OSError:
pass
[docs]
async def maybe_reencode_gif(
data: bytes,
mimetype: str,
filename: str,
) -> tuple[bytes, str, str]:
"""Re-encode GIF or animated WebP as MP4 for the Gemini API.
For animated WebP: Pillow converts WebP->GIF (no ffmpeg libwebp
needed), then the GIF is converted to MP4 via ffmpeg.
For GIF: directly converted to MP4 via ffmpeg.
Returns ``(data, mimetype, filename)`` -- either the converted MP4
or the original inputs unchanged if conversion fails, the input is
not a GIF, or the WebP is not animated (static WebP passes through
as a normal image).
"""
ext = PurePosixPath(filename).suffix.lower()
is_gif = mimetype == "image/gif" or ext == ".gif"
is_anim_webp = (mimetype == "image/webp" or ext == ".webp") and _is_animated_webp(
data
)
if not is_gif and not is_anim_webp:
return data, mimetype, filename
# For animated WebP: convert to GIF first via Pillow, then
# feed into the proven GIF->MP4 pipeline. This avoids needing
# ffmpeg compiled with libwebp (which many servers lack).
gif_data = data
src_label = "GIF"
if is_anim_webp:
src_label = "animated WebP"
gif_data = await asyncio.to_thread(_webp_to_gif_bytes, data)
if gif_data is None:
logger.warning(
"WebP->GIF conversion failed for %s -- passing through as-is",
filename,
)
return data, mimetype, filename
mp4_data = await _convert_gif_to_mp4(gif_data)
if mp4_data is None:
logger.warning(
"%s re-encoding failed for %s -- passing through as-is",
src_label,
filename,
)
return data, mimetype, filename
new_filename = PurePosixPath(filename).stem + ".mp4"
logger.info(
"Re-encoded %s -> MP4: %s (%d bytes -> %d bytes)",
src_label,
filename,
len(data),
len(mp4_data),
)
return mp4_data, "video/mp4", new_filename
# Map Pillow ``Image.format`` strings to LLM-safe image/* MIME types.
_PIL_FORMAT_TO_MIME: dict[str, str] = {
"PNG": "image/png",
"JPEG": "image/jpeg",
"WEBP": "image/webp",
"GIF": "image/gif",
"BMP": "image/bmp",
"TIFF": "image/tiff",
"TIF": "image/tiff",
"ICO": "image/x-icon",
"MPO": "image/jpeg",
}
[docs]
def detect_image_mimetype_from_bytes(data: bytes) -> str | None:
"""Best-effort image MIME from raw bytes (magic + Pillow).
Returns a lowercase ``image/*`` type, or ``None`` if unknown.
"""
if not data:
return None
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if data.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if len(data) >= 6 and data[:6] in (b"GIF87a", b"GIF89a"):
return "image/gif"
if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
return "image/webp"
if data.startswith(b"BM"):
return "image/bmp"
if data.startswith(b"II*\x00") or data.startswith(b"MM\x00*"):
return "image/tiff"
i = 0
if data.startswith(b"\xef\xbb\xbf"):
i = 3
while i < len(data) and data[i] in (9, 10, 13, 32):
i += 1
probe = data[i : i + 4096].lower()
if probe.startswith(b"<svg") or (b"<svg" in probe[:512]):
return "image/svg+xml"
try:
import io
from PIL import Image
with Image.open(io.BytesIO(data)) as im:
fmt = (im.format or "").upper()
except Exception:
return None
return _PIL_FORMAT_TO_MIME.get(fmt)
# Max decoded image size for Claude-family models (Anthropic / OpenRouter).
CLAUDE_MAX_IMAGE_BYTES = 4 * 1024 * 1024
def _image_has_alpha(im: Any) -> bool:
"""Report whether a Pillow image may contain non-opaque (transparent) pixels.
Used to choose an alpha-preserving codec (WebP/PNG) versus a JPEG flatten
when shrinking an oversized image, so transparency is not silently lost or
turned into black/garbage when re-encoded. Conservatively treats any
``RGBA``/``LA`` mode as having alpha, and a palette (``P``) image as having
alpha only when a ``transparency`` entry is present in its info dict; all
other modes (and non-image inputs) are reported opaque.
This is a pure inspection of the in-memory Pillow object with no I/O or side
effects. Called by :func:`shrink_image_under_max_bytes` (its inner
``try_encode`` helper) to branch the encoding path.
Args:
im: The object to inspect; only a ``PIL.Image.Image`` is treated as a
real image, anything else returns ``False``.
Returns:
bool: ``True`` when the image may have transparent pixels, ``False``
otherwise (including for non-image inputs).
"""
from PIL import Image
if not isinstance(im, Image.Image):
return False
if im.mode in ("RGBA", "LA"):
return True
if im.mode == "P":
return "transparency" in im.info
return False
def _ensure_rgb_for_jpeg(im: Any) -> Any:
"""Flatten an alpha or palette Pillow image to opaque ``RGB`` for JPEG output.
JPEG cannot store transparency, so any transparent pixels must be composited
onto a solid background before encoding or they would be dropped to black.
This composites ``RGBA``/``LA`` images over an opaque white background using
the alpha channel as the paste mask, converts a palette (``P``) image with a
transparency entry by promoting it to ``RGBA`` and recursing, and otherwise
converts straight to ``RGB``.
Operates entirely on in-memory Pillow objects (new images via ``Image.new``
and ``convert``) with no I/O. Called by the ``try_encode`` helper inside
:func:`shrink_image_under_max_bytes`, and recursively by itself for the
palette-with-transparency case.
Args:
im: A ``PIL.Image.Image`` to flatten.
Returns:
Any: A new opaque ``RGB`` ``PIL.Image.Image``.
Raises:
TypeError: If *im* is not a ``PIL.Image.Image``.
"""
from PIL import Image
if not isinstance(im, Image.Image):
raise TypeError("expected PIL.Image.Image")
if im.mode in ("RGBA", "LA"):
bg = Image.new("RGB", im.size, (255, 255, 255))
if im.mode == "RGBA":
bg.paste(im, mask=im.split()[3])
else:
bg.paste(im, mask=im.split()[1])
return bg
if im.mode == "P":
if "transparency" in im.info:
im = im.convert("RGBA")
return _ensure_rgb_for_jpeg(im)
return im.convert("RGB")
return im.convert("RGB")
[docs]
def shrink_image_under_max_bytes(
data: bytes,
declared_mimetype: str = "",
*,
max_bytes: int = CLAUDE_MAX_IMAGE_BYTES,
) -> bytes | None:
"""Re-encode and optionally downscale raster image bytes to fit under *max_bytes*.
Used for API providers with a hard per-image size limit. Returns ``None`` if
the image cannot be opened as a raster, is not shrinkable (e.g. SVG), or
remains above *max_bytes* after resize attempts.
If *data* is already at or below *max_bytes*, returns *data* unchanged.
"""
if len(data) <= max_bytes:
return data
try:
from PIL import Image
except ImportError:
return None
try:
src = Image.open(io.BytesIO(data))
src.load()
if getattr(src, "n_frames", 1) > 1:
src.seek(0)
work = src.copy()
src.close()
except Exception:
return None
declared_base = (
declared_mimetype.split(";", 1)[0].strip().lower() if declared_mimetype else ""
)
if declared_base == "image/svg+xml":
return None
min_edge = 256
scale_factor = 0.85
qualities = (85, 75, 65, 55, 45, 35, 25, 20, 15, 10)
max_iterations = 40
def try_encode(img: Any) -> bytes | None:
"""Re-encode *img* at descending quality and return bytes under the cap.
Closure over ``qualities`` and ``max_bytes`` from the enclosing
:func:`shrink_image_under_max_bytes`. Images with alpha are saved as
WebP across the quality ladder, falling back to optimized PNG; opaque
images are flattened to RGB and saved as JPEG across the same ladder.
The first encoding whose byte length is at or below ``max_bytes`` wins.
Calls :func:`_image_has_alpha` to pick the codec and
:func:`_ensure_rgb_for_jpeg` to flatten opaque images before JPEG
encoding; has no side effects beyond in-memory buffers. Invoked only by
the resize loop in :func:`shrink_image_under_max_bytes`.
Args:
img: A ``PIL.Image.Image`` instance (possibly already downscaled by
the caller) to re-encode.
Returns:
bytes | None: Encoded image bytes at or below ``max_bytes``, or
``None`` if no quality level fit or encoding raised.
"""
buf = io.BytesIO()
if _image_has_alpha(img):
for q in qualities:
buf.seek(0)
buf.truncate(0)
try:
img.save(buf, format="WEBP", quality=q, method=6)
except Exception:
break
out = buf.getvalue()
if len(out) <= max_bytes:
return out
buf.seek(0)
buf.truncate(0)
try:
img.save(buf, format="PNG", optimize=True, compress_level=9)
except Exception:
return None
out = buf.getvalue()
if len(out) <= max_bytes:
return out
else:
rgb = _ensure_rgb_for_jpeg(img)
for q in qualities:
buf.seek(0)
buf.truncate(0)
try:
rgb.save(buf, format="JPEG", quality=q, optimize=True)
except Exception:
return None
out = buf.getvalue()
if len(out) <= max_bytes:
return out
return None
for _ in range(max_iterations):
encoded = try_encode(work)
if encoded is not None:
return encoded
new_w = max(1, int(work.width * scale_factor))
new_h = max(1, int(work.height * scale_factor))
if min(new_w, new_h) < min_edge:
return None
try:
work = work.resize((new_w, new_h), Image.Resampling.LANCZOS)
except Exception:
return None
return None
[docs]
def reconcile_image_mimetype_sync(data: bytes, declared: str) -> str:
"""Correct a declared ``image/*`` MIME type against the type detected from bytes.
Platforms and CDNs frequently mislabel images (e.g. a JPEG served as
``image/png``), which makes downstream providers reject or mis-handle the
``data:`` URI. This sniffs the real type from the leading bytes and returns
the detected value when it disagrees with the declared one, stripping any
``charset``/parameter suffix in the process. Non-image declarations are
passed through untouched, and when detection fails the bare declared type is
returned.
Delegates the actual sniffing to :func:`detect_image_mimetype_from_bytes`
(magic-number checks plus a Pillow fallback) and logs an info line at the
module logger when a reconciliation actually changes the type; it performs no
I/O of its own. Called by the async wrapper :func:`reconcile_image_mimetype`
and directly by ``url_content_extractor`` when sanitizing fetched images.
Args:
data: The raw image bytes to sniff.
declared: The MIME type claimed by the source (may include parameters).
Returns:
str: The reconciled MIME type — the detected type when it differs, else
the parameter-stripped declared type.
"""
base = declared.split(";", 1)[0].strip()
if not base.lower().startswith("image/"):
return declared
detected = detect_image_mimetype_from_bytes(data)
if not detected:
return base
declared_lc = base.lower()
if detected.lower() != declared_lc:
logger.info(
"Image MIME reconciled: declared %s -> detected %s",
base,
detected,
)
return detected
return base
[docs]
async def reconcile_image_mimetype(data: bytes, declared: str) -> str:
"""Async wrapper for :func:`reconcile_image_mimetype_sync` off the event loop.
MIME sniffing can fall back to a Pillow decode, which is CPU-bound and would
otherwise block the asyncio event loop on a large image. This offloads the
synchronous reconcile to a worker thread via ``asyncio.to_thread`` so callers
can ``await`` it inline. The detection logic, logging, and return contract are
identical to the sync variant.
Called by :func:`media_to_content_parts` here and by ``url_content_extractor``
when preparing fetched images for the model.
Args:
data: The raw image bytes to sniff.
declared: The MIME type claimed by the source.
Returns:
str: The reconciled MIME type (see :func:`reconcile_image_mimetype_sync`).
"""
return await asyncio.to_thread(reconcile_image_mimetype_sync, data, declared)
[docs]
async def media_to_content_parts(
data: bytes,
mimetype: str,
filename: str,
body_text: str | None = None,
) -> list[dict[str, Any]]:
"""Build an OpenRouter multimodal content-parts list from raw media.
Office / ODF documents are transparently converted to PDF via
LibreOffice so the LLM never sees an unsupported MIME type.
Parameters
----------
data:
The raw file bytes.
mimetype:
MIME type of the file (e.g. ``"image/png"``).
filename:
Human-readable filename.
body_text:
Optional caption / message body text to include alongside the
media. When present it is prepended as a ``text`` content part.
Returns
-------
list[dict]
A list of content-part dicts suitable for the ``content`` field
of an OpenRouter user message.
"""
parts: list[dict[str, Any]] = []
if body_text:
parts.append({"type": "text", "text": body_text})
# Guard against empty/failed media: never emit an empty ``data:`` URI. An
# empty image part is sent to the model as a blank image, so it "doesn't
# see" the picture with no error anywhere. Surface a text note instead so
# the turn stays coherent.
if not data:
parts.append(
{
"type": "text",
"text": f"[Attachment: {filename or 'file'} — could not be loaded]",
}
)
return parts
# Re-encode GIF / animated WebP -> MP4 before any further
# processing so the API receives a well-supported video format.
data, mimetype, filename = await maybe_reencode_gif(
data,
mimetype,
filename,
)
ext = PurePosixPath(filename).suffix.lower()
needs_conversion = (
mimetype in _CONVERTIBLE_MIME_TYPES or ext in _CONVERTIBLE_EXTENSIONS
)
# Remap application/* types that are really text (e.g. JSON, XML,
# YAML) so they are sent as ``text/plain`` instead of falling into
# the unsupported-binary fallback.
if mimetype in _TEXT_REMAP_MIME_TYPES:
mimetype = "text/plain"
if needs_conversion:
pdf_data = await _convert_to_pdf(data, filename)
if pdf_data is not None:
b64 = await asyncio.to_thread(
lambda: base64.b64encode(pdf_data).decode("ascii"),
)
pdf_name = PurePosixPath(filename).stem + ".pdf"
parts.append(
{
"type": "file",
"file": {
"filename": pdf_name,
"file_data": f"data:application/pdf;base64,{b64}",
},
}
)
else:
parts.append(
{
"type": "text",
"text": (
f"[Attachment: {filename} — conversion to PDF failed; "
f"the document could not be processed]"
),
}
)
return parts
if mimetype.split("/")[0] == "image":
mimetype = await reconcile_image_mimetype(data, mimetype)
category = mimetype.split("/")[0]
# Text files are inlined directly — skip the base64 encode.
if category == "text":
try:
decoded = data.decode("utf-8")
except (UnicodeDecodeError, ValueError):
decoded = data.decode("latin-1")
parts.append(
{
"type": "text",
"text": (
f'<untrusted_file_contents filename="{filename}">\n'
f"{decoded}\n"
f"</untrusted_file_contents>"
),
}
)
return parts
b64 = await asyncio.to_thread(
lambda: base64.b64encode(data).decode("ascii"),
)
if category == "image":
parts.append(
{
"type": "image_url",
"image_url": {
"url": f"data:{mimetype};base64,{b64}",
},
}
)
elif category == "audio":
# Use the ``file`` content-part with a data URI so the Gemini proxy
# can translate it to native ``inline_data``. The old ``input_audio``
# format (raw base64 + format string) is an OpenAI-only structure
# that most OpenAI-to-Gemini proxies cannot convert. 💀🔥
parts.append(
{
"type": "file",
"file": {
"filename": filename,
"file_data": f"data:{mimetype};base64,{b64}",
},
}
)
elif category == "video":
parts.append(
{
"type": "video_url",
"video_url": {
"url": f"data:{mimetype};base64,{b64}",
},
}
)
elif mimetype == "application/pdf":
parts.append(
{
"type": "file",
"file": {
"filename": filename,
"file_data": f"data:application/pdf;base64,{b64}",
},
}
)
else:
# Unknown binary — try to decode as text; if that fails, send a
# placeholder so we never forward an unsupported MIME type.
try:
decoded = data.decode("utf-8")
parts.append(
{
"type": "text",
"text": (
f'<untrusted_file_contents filename="{filename}">\n'
f"{decoded}\n"
f"</untrusted_file_contents>"
),
}
)
except (UnicodeDecodeError, ValueError):
parts.append(
{
"type": "text",
"text": (
f"[Attachment: {filename} (type: {mimetype}) — "
f"unsupported file type, contents omitted]"
),
}
)
return parts
# ------------------------------------------------------------------
# LibreOffice PDF conversion
# ------------------------------------------------------------------
_LIBREOFFICE_BINARIES = ("libreoffice", "soffice", "lowriter")
async def _convert_to_pdf(
data: bytes,
filename: str,
timeout: float = 60.0,
) -> bytes | None:
"""Convert a document to PDF using LibreOffice in headless mode.
Returns the PDF bytes on success, or ``None`` on failure.
"""
temp_dir = tempfile.mkdtemp(prefix="doc2pdf_")
try:
input_ext = PurePosixPath(filename).suffix or ".tmp"
input_path = os.path.join(temp_dir, f"input{input_ext}")
with open(input_path, "wb") as f:
f.write(data)
last_error: Exception | None = None
for binary_name in _LIBREOFFICE_BINARIES:
binary_path = shutil.which(binary_name)
if not binary_path:
continue
try:
proc = await asyncio.create_subprocess_exec(
binary_path,
"--headless",
"--norestore",
"--invisible",
"--nodefault",
"--nolockcheck",
"--nofirststartwizard",
"--convert-to",
"pdf",
"--outdir",
temp_dir,
input_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.DEVNULL,
cwd=temp_dir,
)
stdout, stderr = await asyncio.wait_for(
proc.communicate(),
timeout=timeout,
)
if proc.returncode == 0:
break
msg = stderr.decode(errors="ignore") if stderr else ""
logger.warning(
"%s conversion failed (rc=%s): %s",
binary_name,
proc.returncode,
msg[:500],
)
last_error = RuntimeError(
f"{binary_name} exited with {proc.returncode}",
)
except FileNotFoundError as exc:
last_error = exc
continue
except asyncio.TimeoutError:
logger.error(
"LibreOffice conversion timed out after %.0fs for %s",
timeout,
filename,
)
proc.kill()
return None
expected_pdf = os.path.join(
temp_dir,
PurePosixPath(input_path).stem + ".pdf",
)
pdf_path = expected_pdf if os.path.exists(expected_pdf) else None
if not pdf_path:
for entry in os.listdir(temp_dir):
if entry.lower().endswith(".pdf"):
pdf_path = os.path.join(temp_dir, entry)
break
if not pdf_path or not os.path.exists(pdf_path):
logger.error(
"PDF conversion produced no output for %s: %s",
filename,
last_error,
)
return None
with open(pdf_path, "rb") as f:
return f.read()
except Exception:
logger.error(
"Unexpected error converting %s to PDF",
filename,
exc_info=True,
)
return None
finally:
try:
shutil.rmtree(temp_dir)
except OSError:
pass
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _guess_audio_format(filename: str, mimetype: str) -> str:
"""Best-effort short audio format token derived from filename or MIME type.
Produces the bare codec/container token (such as ``wav`` or ``mp3``) that the
OpenRouter ``input_audio`` content type expects, preferring a recognized file
extension and falling back to the MIME subtype when the extension is unknown
or missing. This is a pure string helper with no I/O or side effects.
No internal callers were found by grepping the repo; it is retained as a
utility for building ``input_audio`` parts (the live media path in
:func:`media_to_content_parts` currently sends audio via the data-URI
``file`` part instead).
Args:
filename: The source filename whose suffix is inspected first.
mimetype: The MIME type used as the fallback source of the subtype.
Returns:
str: A lowercase format token, e.g. ``"mp3"`` or the MIME subtype.
"""
ext = PurePosixPath(filename).suffix.lstrip(".").lower()
known = (
"wav",
"mp3",
"ogg",
"flac",
"aac",
"m4a",
"aiff",
"pcm16",
"pcm24",
)
if ext in known:
return ext
# Last resort: use the MIME subtype
return mimetype.split("/")[-1].lower()