Source code for platforms.media_common

"""Shared media-to-content-part conversion for all platforms.

Converts raw bytes + MIME type into the multimodal content-part format
expected by the OpenRouter chat-completions API.  Platform-specific
*download* logic lives in each adapter; this module only handles the
format conversion.

Office / ODF / EPUB documents whose MIME types are not supported by
the downstream LLM are automatically converted to PDF via LibreOffice
headless before being embedded as content parts.

GIF and animated WebP images are automatically re-encoded as MP4
(H.264 baseline) so the Gemini API receives a well-supported video
format instead of GIF/animated-WebP.
"""

from __future__ import annotations

import asyncio
import base64
import io
import logging
import os
import random
import shutil
import tempfile
from pathlib import PurePosixPath
from typing import Any

logger = logging.getLogger(__name__)

# Maps broad MIME categories / specific types to audio format strings
# accepted by the OpenRouter ``input_audio`` content type.
_AUDIO_FORMAT_MAP: dict[str, str] = {
    "audio/wav": "wav",
    "audio/x-wav": "wav",
    "audio/wave": "wav",
    "audio/mp3": "mp3",
    "audio/mpeg": "mp3",
    "audio/ogg": "ogg",
    "audio/flac": "flac",
    "audio/x-flac": "flac",
    "audio/aac": "aac",
    "audio/mp4": "m4a",
    "audio/x-m4a": "m4a",
    "audio/m4a": "m4a",
    "audio/aiff": "aiff",
    "audio/x-aiff": "aiff",
}

# Application/* MIME types that are really text and should be sent as
# ``text/plain`` so the LLM receives a supported type.
_TEXT_REMAP_MIME_TYPES: set[str] = {
    "application/json",
    "application/xml",
    "application/javascript",
    "application/x-javascript",
    "application/typescript",
    "application/x-yaml",
    "application/yaml",
    "application/x-sh",
    "application/x-shellscript",
    "application/toml",
    "application/x-toml",
    "application/sql",
    "application/graphql",
}

# MIME types that should be converted to PDF before sending to the LLM.
_CONVERTIBLE_MIME_TYPES: set[str] = {
    # Word-processing
    "application/msword",
    "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
    "application/rtf",
    "text/rtf",
    "application/vnd.oasis.opendocument.text",
    # Spreadsheets
    "application/vnd.ms-excel",
    "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
    "application/vnd.ms-excel.sheet.macroEnabled.12",
    "application/vnd.ms-excel.sheet.binary.macroEnabled.12",
    "application/vnd.oasis.opendocument.spreadsheet",
    # Presentations
    "application/vnd.ms-powerpoint",
    "application/vnd.openxmlformats-officedocument.presentationml.presentation",
    "application/vnd.openxmlformats-officedocument.presentationml.slideshow",
    "application/vnd.ms-powerpoint.presentation.macroEnabled.12",
    "application/vnd.ms-powerpoint.slideshow.macroEnabled.12",
    "application/vnd.oasis.opendocument.presentation",
    # E-books
    "application/epub+zip",
}

# File extensions that should trigger PDF conversion (fallback when MIME
# type is generic like ``application/octet-stream``).
_CONVERTIBLE_EXTENSIONS: set[str] = {
    ".doc",
    ".docx",
    ".rtf",
    ".odt",
    ".xls",
    ".xlsx",
    ".xlsm",
    ".xlsb",
    ".ods",
    ".ppt",
    ".pptx",
    ".pptm",
    ".pps",
    ".ppsx",
    ".odp",
    ".epub",
}

# ------------------------------------------------------------------
# CDN download retry
# ------------------------------------------------------------------

#: HTTP status codes that mean "don't bother retrying" — the resource is
#: genuinely gone or forbidden, so hammering the CDN only wastes time.
_PERMANENT_DOWNLOAD_STATUSES: frozenset[int] = frozenset({400, 401, 403, 404, 410})

#: Default backoff knobs for :func:`download_with_retry` (the attempt count is
#: configurable per-call; these shape the delay between attempts).
_RETRY_BASE_DELAY_SECONDS: float = 0.5
_RETRY_MAX_DELAY_SECONDS: float = 8.0


def _is_permanent_download_error(exc: BaseException) -> bool:
    """Return ``True`` when *exc* represents a non-retryable download failure.

    A transient CDN blip (timeout, connection reset, 5xx, 429) should be
    retried; a genuine 404/403/410 should not. Different platform libraries
    raise different exception types (``discord.HTTPException`` carries
    ``.status``; ``aiohttp.ClientResponseError`` carries ``.status``; the
    Matrix helper wraps ``DownloadError`` in a bare ``RuntimeError`` with no
    status), so we classify by inspecting an optional status attribute rather
    than coupling this module to every platform's exception hierarchy. When no
    status is present we treat the error as transient (retryable).
    """
    status = getattr(exc, "status", None)
    if status is None:
        status = getattr(exc, "status_code", None)
    try:
        return int(status) in _PERMANENT_DOWNLOAD_STATUSES
    except (TypeError, ValueError):
        return False


[docs] async def download_with_retry( downloader: Any, *, attempts: int = 3, base_delay: float = _RETRY_BASE_DELAY_SECONDS, max_delay: float = _RETRY_MAX_DELAY_SECONDS, label: str = "", ) -> tuple[bytes, str, str]: """Call *downloader* with bounded retry on transient CDN failures. *downloader* is the same ``async`` callable the platform adapters already pass to :meth:`MediaCache.get_or_download` — it returns ``(data, mimetype, filename)``. A single transient Discord/Matrix CDN blip otherwise drops the attachment for that one message (the cache fix only stops the failure from becoming *permanent*); a few retries close that one-shot loss. Retries on: * any raised exception that is not a permanent HTTP status (see :func:`_is_permanent_download_error`), and * an empty-bytes result (``not data``) — treated as a failed download, consistent with ``MediaCache.get_or_download`` which refuses to cache it. Permanent errors (404/403/410/…) re-raise immediately. After the final attempt the last result is returned as-is (possibly empty) so the existing empty-media handling downstream stays in control: ``get_or_download`` will not cache it and ``media_to_content_parts`` emits a text note instead of a blank image. Backoff is exponential (``base_delay * 2**n``) capped at *max_delay*, with uniform jitter to avoid thundering-herd re-fetches. """ attempts = max(1, int(attempts)) # Safe fallback if every attempt comes back empty; the empty-result branch # below overwrites it with the real (mimetype, filename) when available. last_result: tuple[bytes, str, str] = (b"", "application/octet-stream", label or "file") for attempt in range(1, attempts + 1): try: data, mimetype, filename = await downloader() if data: return data, mimetype, filename # Empty bytes == failed download. Keep the tuple so we can return # it verbatim if every attempt comes back empty. last_result = (data, mimetype, filename) reason = "empty result" except Exception as exc: # noqa: BLE001 — classify, then retry or re-raise if _is_permanent_download_error(exc): raise if attempt == attempts: raise reason = repr(exc) if attempt == attempts: logger.warning( "Media download '%s' still failing after %d attempt(s) (%s)", label or "<unnamed>", attempts, reason, ) break delay = min(max_delay, base_delay * (2 ** (attempt - 1))) delay += random.uniform(0.0, delay * 0.25) logger.warning( "Media download '%s' attempt %d/%d failed (%s); retrying in %.2fs", label or "<unnamed>", attempt, attempts, reason, delay, ) await asyncio.sleep(delay) # Exhausted: return the last (empty) result so downstream empty-handling # runs (``get_or_download`` won't cache it; ``media_to_content_parts`` emits # a text note). Reachable only via the empty-result path — an all-raising # run re-raises above. return last_result
# ------------------------------------------------------------------ # GIF / animated WebP -> MP4 re-encoding # ------------------------------------------------------------------ def _is_animated_webp(data: bytes) -> bool: """Check if raw bytes are an *animated* WebP. Animated WebP files use the extended format (VP8X) with the animation flag set, and contain ANMF (animation frame) chunks. Static WebP (VP8 or VP8L) files are left alone. """ # RIFF....WEBP header check if len(data) < 30 or data[:4] != b"RIFF" or data[8:12] != b"WEBP": return False # VP8X chunk starts at offset 12 for extended format if data[12:16] != b"VP8X": return False # Flags byte is at offset 20; bit 1 (0x02) = animation flag if data[20] & 0x02: return True # Fallback: scan for ANMF chunk headers (animation frame) return b"ANMF" in data[:4096] def _webp_to_gif_bytes(data: bytes) -> bytes | None: """Convert animated WebP to GIF using Pillow. Pillow has native WebP support (including animated) and does NOT require ffmpeg's libwebp demuxer. This reliably extracts all frames and re-encodes as GIF so the proven GIF->MP4 ffmpeg pipeline can take over. Returns GIF bytes on success, or None on failure. """ try: import io from PIL import Image img = Image.open(io.BytesIO(data)) # Check if actually animated n_frames = getattr(img, "n_frames", 1) if n_frames <= 1: # Single frame -- not actually animated, bail return None # Extract all frames frames = [] durations = [] for i in range(n_frames): img.seek(i) frame = img.convert("RGBA") frames.append(frame) # Get frame duration (ms), default 100ms durations.append(img.info.get("duration", 100)) if not frames: return None # Write as GIF buf = io.BytesIO() frames[0].save( buf, format="GIF", save_all=True, append_images=frames[1:], duration=durations, loop=img.info.get("loop", 0), disposal=2, # restore to background ) gif_data = buf.getvalue() logger.info( "Pillow WebP->GIF: %d frames, %d bytes -> %d bytes", n_frames, len(data), len(gif_data), ) return gif_data except ImportError: logger.warning("Pillow not installed -- cannot convert animated WebP") return None except Exception: logger.error("Pillow WebP->GIF conversion failed", exc_info=True) return None async def _convert_gif_to_mp4( data: bytes, timeout: float = 30.0, ) -> bytes | None: """Convert GIF bytes to MP4 (H.264 baseline, yuv420p) via ffmpeg. Returns the MP4 bytes on success, or ``None`` on failure. """ ffmpeg_path = shutil.which("ffmpeg") if not ffmpeg_path: logger.warning("ffmpeg not found -- GIF will not be re-encoded") return None temp_dir = tempfile.mkdtemp(prefix="gif2mp4_") try: input_path = os.path.join(temp_dir, "input.gif") output_path = os.path.join(temp_dir, "output.mp4") with open(input_path, "wb") as f: f.write(data) proc = await asyncio.create_subprocess_exec( ffmpeg_path, "-y", "-i", input_path, # H.264 baseline profile -- widest possible decoder support "-c:v", "libx264", "-profile:v", "baseline", "-level", "3.0", "-pix_fmt", "yuv420p", # Pad to even dimensions (required by H.264) "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", "-movflags", "+faststart", "-an", # no audio track needed output_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, ) _, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout, ) if proc.returncode != 0: logger.warning( "ffmpeg GIF->MP4 failed (rc=%s): %s", proc.returncode, (stderr or b"").decode(errors="ignore")[:500], ) return None if not os.path.exists(output_path): logger.warning("ffmpeg produced no output for GIF->MP4") return None with open(output_path, "rb") as f: return f.read() except asyncio.TimeoutError: logger.error("ffmpeg GIF->MP4 timed out after %.0fs", timeout) return None except Exception: logger.error("Unexpected error in GIF->MP4 conversion", exc_info=True) return None finally: try: shutil.rmtree(temp_dir) except OSError: pass
[docs] async def maybe_reencode_gif( data: bytes, mimetype: str, filename: str, ) -> tuple[bytes, str, str]: """Re-encode GIF or animated WebP as MP4 for the Gemini API. For animated WebP: Pillow converts WebP->GIF (no ffmpeg libwebp needed), then the GIF is converted to MP4 via ffmpeg. For GIF: directly converted to MP4 via ffmpeg. Returns ``(data, mimetype, filename)`` -- either the converted MP4 or the original inputs unchanged if conversion fails, the input is not a GIF, or the WebP is not animated (static WebP passes through as a normal image). """ ext = PurePosixPath(filename).suffix.lower() is_gif = mimetype == "image/gif" or ext == ".gif" is_anim_webp = (mimetype == "image/webp" or ext == ".webp") and _is_animated_webp( data ) if not is_gif and not is_anim_webp: return data, mimetype, filename # For animated WebP: convert to GIF first via Pillow, then # feed into the proven GIF->MP4 pipeline. This avoids needing # ffmpeg compiled with libwebp (which many servers lack). gif_data = data src_label = "GIF" if is_anim_webp: src_label = "animated WebP" gif_data = await asyncio.to_thread(_webp_to_gif_bytes, data) if gif_data is None: logger.warning( "WebP->GIF conversion failed for %s -- passing through as-is", filename, ) return data, mimetype, filename mp4_data = await _convert_gif_to_mp4(gif_data) if mp4_data is None: logger.warning( "%s re-encoding failed for %s -- passing through as-is", src_label, filename, ) return data, mimetype, filename new_filename = PurePosixPath(filename).stem + ".mp4" logger.info( "Re-encoded %s -> MP4: %s (%d bytes -> %d bytes)", src_label, filename, len(data), len(mp4_data), ) return mp4_data, "video/mp4", new_filename
# Map Pillow ``Image.format`` strings to LLM-safe image/* MIME types. _PIL_FORMAT_TO_MIME: dict[str, str] = { "PNG": "image/png", "JPEG": "image/jpeg", "WEBP": "image/webp", "GIF": "image/gif", "BMP": "image/bmp", "TIFF": "image/tiff", "TIF": "image/tiff", "ICO": "image/x-icon", "MPO": "image/jpeg", }
[docs] def detect_image_mimetype_from_bytes(data: bytes) -> str | None: """Best-effort image MIME from raw bytes (magic + Pillow). Returns a lowercase ``image/*`` type, or ``None`` if unknown. """ if not data: return None if data.startswith(b"\x89PNG\r\n\x1a\n"): return "image/png" if data.startswith(b"\xff\xd8\xff"): return "image/jpeg" if len(data) >= 6 and data[:6] in (b"GIF87a", b"GIF89a"): return "image/gif" if data[:4] == b"RIFF" and data[8:12] == b"WEBP": return "image/webp" if data.startswith(b"BM"): return "image/bmp" if data.startswith(b"II*\x00") or data.startswith(b"MM\x00*"): return "image/tiff" i = 0 if data.startswith(b"\xef\xbb\xbf"): i = 3 while i < len(data) and data[i] in (9, 10, 13, 32): i += 1 probe = data[i : i + 4096].lower() if probe.startswith(b"<svg") or (b"<svg" in probe[:512]): return "image/svg+xml" try: import io from PIL import Image with Image.open(io.BytesIO(data)) as im: fmt = (im.format or "").upper() except Exception: return None return _PIL_FORMAT_TO_MIME.get(fmt)
# Max decoded image size for Claude-family models (Anthropic / OpenRouter). CLAUDE_MAX_IMAGE_BYTES = 4 * 1024 * 1024 def _image_has_alpha(im: Any) -> bool: """Report whether a Pillow image may contain non-opaque (transparent) pixels. Used to choose an alpha-preserving codec (WebP/PNG) versus a JPEG flatten when shrinking an oversized image, so transparency is not silently lost or turned into black/garbage when re-encoded. Conservatively treats any ``RGBA``/``LA`` mode as having alpha, and a palette (``P``) image as having alpha only when a ``transparency`` entry is present in its info dict; all other modes (and non-image inputs) are reported opaque. This is a pure inspection of the in-memory Pillow object with no I/O or side effects. Called by :func:`shrink_image_under_max_bytes` (its inner ``try_encode`` helper) to branch the encoding path. Args: im: The object to inspect; only a ``PIL.Image.Image`` is treated as a real image, anything else returns ``False``. Returns: bool: ``True`` when the image may have transparent pixels, ``False`` otherwise (including for non-image inputs). """ from PIL import Image if not isinstance(im, Image.Image): return False if im.mode in ("RGBA", "LA"): return True if im.mode == "P": return "transparency" in im.info return False def _ensure_rgb_for_jpeg(im: Any) -> Any: """Flatten an alpha or palette Pillow image to opaque ``RGB`` for JPEG output. JPEG cannot store transparency, so any transparent pixels must be composited onto a solid background before encoding or they would be dropped to black. This composites ``RGBA``/``LA`` images over an opaque white background using the alpha channel as the paste mask, converts a palette (``P``) image with a transparency entry by promoting it to ``RGBA`` and recursing, and otherwise converts straight to ``RGB``. Operates entirely on in-memory Pillow objects (new images via ``Image.new`` and ``convert``) with no I/O. Called by the ``try_encode`` helper inside :func:`shrink_image_under_max_bytes`, and recursively by itself for the palette-with-transparency case. Args: im: A ``PIL.Image.Image`` to flatten. Returns: Any: A new opaque ``RGB`` ``PIL.Image.Image``. Raises: TypeError: If *im* is not a ``PIL.Image.Image``. """ from PIL import Image if not isinstance(im, Image.Image): raise TypeError("expected PIL.Image.Image") if im.mode in ("RGBA", "LA"): bg = Image.new("RGB", im.size, (255, 255, 255)) if im.mode == "RGBA": bg.paste(im, mask=im.split()[3]) else: bg.paste(im, mask=im.split()[1]) return bg if im.mode == "P": if "transparency" in im.info: im = im.convert("RGBA") return _ensure_rgb_for_jpeg(im) return im.convert("RGB") return im.convert("RGB")
[docs] def shrink_image_under_max_bytes( data: bytes, declared_mimetype: str = "", *, max_bytes: int = CLAUDE_MAX_IMAGE_BYTES, ) -> bytes | None: """Re-encode and optionally downscale raster image bytes to fit under *max_bytes*. Used for API providers with a hard per-image size limit. Returns ``None`` if the image cannot be opened as a raster, is not shrinkable (e.g. SVG), or remains above *max_bytes* after resize attempts. If *data* is already at or below *max_bytes*, returns *data* unchanged. """ if len(data) <= max_bytes: return data try: from PIL import Image except ImportError: return None try: src = Image.open(io.BytesIO(data)) src.load() if getattr(src, "n_frames", 1) > 1: src.seek(0) work = src.copy() src.close() except Exception: return None declared_base = ( declared_mimetype.split(";", 1)[0].strip().lower() if declared_mimetype else "" ) if declared_base == "image/svg+xml": return None min_edge = 256 scale_factor = 0.85 qualities = (85, 75, 65, 55, 45, 35, 25, 20, 15, 10) max_iterations = 40 def try_encode(img: Any) -> bytes | None: """Re-encode *img* at descending quality and return bytes under the cap. Closure over ``qualities`` and ``max_bytes`` from the enclosing :func:`shrink_image_under_max_bytes`. Images with alpha are saved as WebP across the quality ladder, falling back to optimized PNG; opaque images are flattened to RGB and saved as JPEG across the same ladder. The first encoding whose byte length is at or below ``max_bytes`` wins. Calls :func:`_image_has_alpha` to pick the codec and :func:`_ensure_rgb_for_jpeg` to flatten opaque images before JPEG encoding; has no side effects beyond in-memory buffers. Invoked only by the resize loop in :func:`shrink_image_under_max_bytes`. Args: img: A ``PIL.Image.Image`` instance (possibly already downscaled by the caller) to re-encode. Returns: bytes | None: Encoded image bytes at or below ``max_bytes``, or ``None`` if no quality level fit or encoding raised. """ buf = io.BytesIO() if _image_has_alpha(img): for q in qualities: buf.seek(0) buf.truncate(0) try: img.save(buf, format="WEBP", quality=q, method=6) except Exception: break out = buf.getvalue() if len(out) <= max_bytes: return out buf.seek(0) buf.truncate(0) try: img.save(buf, format="PNG", optimize=True, compress_level=9) except Exception: return None out = buf.getvalue() if len(out) <= max_bytes: return out else: rgb = _ensure_rgb_for_jpeg(img) for q in qualities: buf.seek(0) buf.truncate(0) try: rgb.save(buf, format="JPEG", quality=q, optimize=True) except Exception: return None out = buf.getvalue() if len(out) <= max_bytes: return out return None for _ in range(max_iterations): encoded = try_encode(work) if encoded is not None: return encoded new_w = max(1, int(work.width * scale_factor)) new_h = max(1, int(work.height * scale_factor)) if min(new_w, new_h) < min_edge: return None try: work = work.resize((new_w, new_h), Image.Resampling.LANCZOS) except Exception: return None return None
[docs] def reconcile_image_mimetype_sync(data: bytes, declared: str) -> str: """Correct a declared ``image/*`` MIME type against the type detected from bytes. Platforms and CDNs frequently mislabel images (e.g. a JPEG served as ``image/png``), which makes downstream providers reject or mis-handle the ``data:`` URI. This sniffs the real type from the leading bytes and returns the detected value when it disagrees with the declared one, stripping any ``charset``/parameter suffix in the process. Non-image declarations are passed through untouched, and when detection fails the bare declared type is returned. Delegates the actual sniffing to :func:`detect_image_mimetype_from_bytes` (magic-number checks plus a Pillow fallback) and logs an info line at the module logger when a reconciliation actually changes the type; it performs no I/O of its own. Called by the async wrapper :func:`reconcile_image_mimetype` and directly by ``url_content_extractor`` when sanitizing fetched images. Args: data: The raw image bytes to sniff. declared: The MIME type claimed by the source (may include parameters). Returns: str: The reconciled MIME type — the detected type when it differs, else the parameter-stripped declared type. """ base = declared.split(";", 1)[0].strip() if not base.lower().startswith("image/"): return declared detected = detect_image_mimetype_from_bytes(data) if not detected: return base declared_lc = base.lower() if detected.lower() != declared_lc: logger.info( "Image MIME reconciled: declared %s -> detected %s", base, detected, ) return detected return base
[docs] async def reconcile_image_mimetype(data: bytes, declared: str) -> str: """Async wrapper for :func:`reconcile_image_mimetype_sync` off the event loop. MIME sniffing can fall back to a Pillow decode, which is CPU-bound and would otherwise block the asyncio event loop on a large image. This offloads the synchronous reconcile to a worker thread via ``asyncio.to_thread`` so callers can ``await`` it inline. The detection logic, logging, and return contract are identical to the sync variant. Called by :func:`media_to_content_parts` here and by ``url_content_extractor`` when preparing fetched images for the model. Args: data: The raw image bytes to sniff. declared: The MIME type claimed by the source. Returns: str: The reconciled MIME type (see :func:`reconcile_image_mimetype_sync`). """ return await asyncio.to_thread(reconcile_image_mimetype_sync, data, declared)
[docs] async def media_to_content_parts( data: bytes, mimetype: str, filename: str, body_text: str | None = None, ) -> list[dict[str, Any]]: """Build an OpenRouter multimodal content-parts list from raw media. Office / ODF documents are transparently converted to PDF via LibreOffice so the LLM never sees an unsupported MIME type. Parameters ---------- data: The raw file bytes. mimetype: MIME type of the file (e.g. ``"image/png"``). filename: Human-readable filename. body_text: Optional caption / message body text to include alongside the media. When present it is prepended as a ``text`` content part. Returns ------- list[dict] A list of content-part dicts suitable for the ``content`` field of an OpenRouter user message. """ parts: list[dict[str, Any]] = [] if body_text: parts.append({"type": "text", "text": body_text}) # Guard against empty/failed media: never emit an empty ``data:`` URI. An # empty image part is sent to the model as a blank image, so it "doesn't # see" the picture with no error anywhere. Surface a text note instead so # the turn stays coherent. if not data: parts.append( { "type": "text", "text": f"[Attachment: {filename or 'file'} — could not be loaded]", } ) return parts # Re-encode GIF / animated WebP -> MP4 before any further # processing so the API receives a well-supported video format. data, mimetype, filename = await maybe_reencode_gif( data, mimetype, filename, ) ext = PurePosixPath(filename).suffix.lower() needs_conversion = ( mimetype in _CONVERTIBLE_MIME_TYPES or ext in _CONVERTIBLE_EXTENSIONS ) # Remap application/* types that are really text (e.g. JSON, XML, # YAML) so they are sent as ``text/plain`` instead of falling into # the unsupported-binary fallback. if mimetype in _TEXT_REMAP_MIME_TYPES: mimetype = "text/plain" if needs_conversion: pdf_data = await _convert_to_pdf(data, filename) if pdf_data is not None: b64 = await asyncio.to_thread( lambda: base64.b64encode(pdf_data).decode("ascii"), ) pdf_name = PurePosixPath(filename).stem + ".pdf" parts.append( { "type": "file", "file": { "filename": pdf_name, "file_data": f"data:application/pdf;base64,{b64}", }, } ) else: parts.append( { "type": "text", "text": ( f"[Attachment: {filename} — conversion to PDF failed; " f"the document could not be processed]" ), } ) return parts if mimetype.split("/")[0] == "image": mimetype = await reconcile_image_mimetype(data, mimetype) category = mimetype.split("/")[0] # Text files are inlined directly — skip the base64 encode. if category == "text": try: decoded = data.decode("utf-8") except (UnicodeDecodeError, ValueError): decoded = data.decode("latin-1") parts.append( { "type": "text", "text": ( f'<untrusted_file_contents filename="{filename}">\n' f"{decoded}\n" f"</untrusted_file_contents>" ), } ) return parts b64 = await asyncio.to_thread( lambda: base64.b64encode(data).decode("ascii"), ) if category == "image": parts.append( { "type": "image_url", "image_url": { "url": f"data:{mimetype};base64,{b64}", }, } ) elif category == "audio": # Use the ``file`` content-part with a data URI so the Gemini proxy # can translate it to native ``inline_data``. The old ``input_audio`` # format (raw base64 + format string) is an OpenAI-only structure # that most OpenAI-to-Gemini proxies cannot convert. 💀🔥 parts.append( { "type": "file", "file": { "filename": filename, "file_data": f"data:{mimetype};base64,{b64}", }, } ) elif category == "video": parts.append( { "type": "video_url", "video_url": { "url": f"data:{mimetype};base64,{b64}", }, } ) elif mimetype == "application/pdf": parts.append( { "type": "file", "file": { "filename": filename, "file_data": f"data:application/pdf;base64,{b64}", }, } ) else: # Unknown binary — try to decode as text; if that fails, send a # placeholder so we never forward an unsupported MIME type. try: decoded = data.decode("utf-8") parts.append( { "type": "text", "text": ( f'<untrusted_file_contents filename="{filename}">\n' f"{decoded}\n" f"</untrusted_file_contents>" ), } ) except (UnicodeDecodeError, ValueError): parts.append( { "type": "text", "text": ( f"[Attachment: {filename} (type: {mimetype}) — " f"unsupported file type, contents omitted]" ), } ) return parts
# ------------------------------------------------------------------ # LibreOffice PDF conversion # ------------------------------------------------------------------ _LIBREOFFICE_BINARIES = ("libreoffice", "soffice", "lowriter") async def _convert_to_pdf( data: bytes, filename: str, timeout: float = 60.0, ) -> bytes | None: """Convert a document to PDF using LibreOffice in headless mode. Returns the PDF bytes on success, or ``None`` on failure. """ temp_dir = tempfile.mkdtemp(prefix="doc2pdf_") try: input_ext = PurePosixPath(filename).suffix or ".tmp" input_path = os.path.join(temp_dir, f"input{input_ext}") with open(input_path, "wb") as f: f.write(data) last_error: Exception | None = None for binary_name in _LIBREOFFICE_BINARIES: binary_path = shutil.which(binary_name) if not binary_path: continue try: proc = await asyncio.create_subprocess_exec( binary_path, "--headless", "--norestore", "--invisible", "--nodefault", "--nolockcheck", "--nofirststartwizard", "--convert-to", "pdf", "--outdir", temp_dir, input_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.DEVNULL, cwd=temp_dir, ) stdout, stderr = await asyncio.wait_for( proc.communicate(), timeout=timeout, ) if proc.returncode == 0: break msg = stderr.decode(errors="ignore") if stderr else "" logger.warning( "%s conversion failed (rc=%s): %s", binary_name, proc.returncode, msg[:500], ) last_error = RuntimeError( f"{binary_name} exited with {proc.returncode}", ) except FileNotFoundError as exc: last_error = exc continue except asyncio.TimeoutError: logger.error( "LibreOffice conversion timed out after %.0fs for %s", timeout, filename, ) proc.kill() return None expected_pdf = os.path.join( temp_dir, PurePosixPath(input_path).stem + ".pdf", ) pdf_path = expected_pdf if os.path.exists(expected_pdf) else None if not pdf_path: for entry in os.listdir(temp_dir): if entry.lower().endswith(".pdf"): pdf_path = os.path.join(temp_dir, entry) break if not pdf_path or not os.path.exists(pdf_path): logger.error( "PDF conversion produced no output for %s: %s", filename, last_error, ) return None with open(pdf_path, "rb") as f: return f.read() except Exception: logger.error( "Unexpected error converting %s to PDF", filename, exc_info=True, ) return None finally: try: shutil.rmtree(temp_dir) except OSError: pass # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _guess_audio_format(filename: str, mimetype: str) -> str: """Best-effort short audio format token derived from filename or MIME type. Produces the bare codec/container token (such as ``wav`` or ``mp3``) that the OpenRouter ``input_audio`` content type expects, preferring a recognized file extension and falling back to the MIME subtype when the extension is unknown or missing. This is a pure string helper with no I/O or side effects. No internal callers were found by grepping the repo; it is retained as a utility for building ``input_audio`` parts (the live media path in :func:`media_to_content_parts` currently sends audio via the data-URI ``file`` part instead). Args: filename: The source filename whose suffix is inspected first. mimetype: The MIME type used as the fallback source of the subtype. Returns: str: A lowercase format token, e.g. ``"mp3"`` or the MIME subtype. """ ext = PurePosixPath(filename).suffix.lstrip(".").lower() known = ( "wav", "mp3", "ogg", "flac", "aac", "m4a", "aiff", "pcm16", "pcm24", ) if ext in known: return ext # Last resort: use the MIME subtype return mimetype.split("/")[-1].lower()