Source code for tools.generate_image

"""Generate images via native Gemini API and send to the current channel."""

from __future__ import annotations

import asyncio
import base64
import hashlib
import jsonutil as json
import logging
from io import BytesIO
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# Native Gemini API endpoint
GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta"
DEFAULT_IMAGE_MODEL = "gemini-3.1-flash-image-preview"
FALLBACK_API_KEY = "AIzaSyCCwz9WCsIKSWsfufU6E-JbPsP1acLhZTU"

SUPPORTED_ASPECT_RATIOS = {
    "1:1",
    "2:3",
    "3:2",
    "3:4",
    "4:3",
    "4:5",
    "5:4",
    "9:16",
    "16:9",
    "21:9",
}

_JSON_BODY_DECODE_THRESHOLD = 256 * 1024


def _json_loads_utf8(body: bytes) -> Any:
    """Decode UTF-8 bytes and parse them as JSON.

    A thin synchronous helper that UTF-8 decodes a raw response ``body`` and
    parses it through the module's ``json`` facade (``jsonutil`` aliased as
    ``json``, which prefers a faster JSON backend when available). It exists so
    that large response bodies can be parsed off the event loop: in
    ``_call_gemini_native`` this function is handed to ``asyncio.to_thread``
    whenever the raw payload is at least ``_JSON_BODY_DECODE_THRESHOLD`` bytes,
    keeping a big decode/parse from blocking the loop. It performs no I/O,
    network, Redis, or KG access of its own.

    The only internal caller found is ``_call_gemini_native`` in this module
    (via ``asyncio.to_thread(_json_loads_utf8, raw)``). A separate identically
    named helper lives in ``openrouter_client/transport.py``; this one is
    local to image generation.

    Args:
        body: Raw response bytes encoded as UTF-8.

    Returns:
        The parsed JSON value (typically a ``dict`` for Gemini responses, but
        any JSON-representable type).

    Raises:
        UnicodeDecodeError: If ``body`` is not valid UTF-8.
        ValueError: If the decoded text is not valid JSON (the concrete error
            type depends on the active JSON backend).
    """
    return json.loads(body.decode("utf-8"))


IMAGE_GENERATION_SYSTEM_PROMPT = (
    "You are an expert image generation artist specializing in "
    "extremely high-quality, detailed artwork.\n\n"
    "DEFAULT STYLE - ANIME/ILLUSTRATION:\n"
    "By default, generate images in a premium anime/illustration "
    "style with rich, vibrant color palettes, masterful lighting, "
    "expressive character designs, beautiful backgrounds, smooth "
    "gradients, and cinematic quality.\n\n"
    "REALISM MODE:\n"
    "When the user explicitly requests realistic or photorealistic "
    "imagery, switch to hyperrealistic rendering with photographic "
    "accuracy, natural lighting, and true-to-life textures.\n\n"
    "QUALITY STANDARDS:\n"
    "Always aim for the highest possible quality. Create visually "
    "striking images with excellent composition, lighting, and "
    "color harmony."
)

TOOL_NAME = "generate_image"
TOOL_DESCRIPTION = (
    "Generate an AI image from a text prompt using Gemini and "
    "send it to the current channel. Supports multiple "
    "aspect ratios."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "prompt": {
            "type": "string",
            "description": ("Text description of the image to generate."),
        },
        "aspect_ratio": {
            "type": "string",
            "description": (
                "Aspect ratio. Supported: 1:1, 2:3, 3:2, 3:4, "
                "4:3, 4:5, 5:4, 9:16, 16:9, 21:9. "
                "Default: 16:9."
            ),
        },
        "model": {
            "type": "string",
            "description": (
                "Model name. Default: gemini-3.1-flash-image-preview. "
                "Also available: gemini-3-pro-image-preview."
            ),
        },
    },
    "required": ["prompt"],
}


IMAGE_RATE_LIMIT_ERROR = (
    "User has reached their daily image generation limit ({current}/{limit}). "
    "Image generation is expensive, and we can't subsidize this at scale "
    "for everyone. To unlock unlimited image generation, provide your own "
    "Gemini API key:\n"
    "1. Get a key at: https://aistudio.google.com/apikey\n"
    "2. Send it via DM: set_user_api_key service=gemini api_key=YOUR_KEY\n"
    "Your own key has no daily limit."
)

_IMAGE_DAILY_LIMIT = 5


async def _default_key_image_quota_applies(
    ctx: ToolContext | None,
    using_own_key: bool,
) -> bool:
    """Decide whether the shared default-key daily image quota applies.

    Thin pass-through to ``tools.manage_api_keys.default_key_limit_applies`` so
    the exemption logic -- admins, the ``BYPASS_RATELIMIT`` privilege, and users
    on their own Gemini key -- lives in exactly one place rather than being
    duplicated here. That delegate reads privilege state from Redis via the
    context, so this is an async, Redis-touching check despite its small size.

    Called within this module by ``run`` both before generating (to gate the
    request) and after a successful generation (to decide whether to increment
    usage); ``tools/edit_image.py`` imports and uses it for the same purpose.

    Args:
        ctx: The current ``ToolContext`` (may be ``None``), used by the
            delegate to look up the user's admin status and privileges.
        using_own_key: ``True`` when the caller resolved to their own user
            Gemini key, which exempts them from the shared quota.

    Returns:
        ``True`` if the shared default-key daily quota should be enforced for
        this call, ``False`` if the user is exempt.
    """
    from tools.manage_api_keys import default_key_limit_applies

    return await default_key_limit_applies(ctx, using_own_key=using_own_key)


async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
    """Resolve the Gemini API key to use, preferring the user's own.

    Looks up a per-user Gemini key by calling
    ``tools.manage_api_keys.get_user_api_key`` (which reads the encrypted key
    store from Redis, scoped by ``ctx.user_id`` and channel) and, on success,
    reports that the user's own key is in use. When there is no context, no
    user id, no stored key, or the lookup raises, it falls back to the shared
    module-level ``FALLBACK_API_KEY`` and reports a non-own key, which is what
    triggers daily-quota enforcement downstream. Exceptions from the lookup are
    logged and swallowed rather than propagated.

    Called within this module by ``run`` to obtain the key for
    ``_call_gemini_native``; it is also imported and reused by
    ``tools/generate_background.py`` and ``tools/edit_image.py``.

    Args:
        ctx: The current ``ToolContext`` (or ``None``); supplies ``user_id``,
            ``redis``, ``channel_id``, and ``config`` for the key lookup.

    Returns:
        A ``(api_key, using_own_key)`` tuple: the resolved key string, and a
        boolean that is ``True`` only when the user's own stored key was used.
    """
    if ctx is not None and getattr(ctx, "user_id", None):
        try:
            from tools.manage_api_keys import get_user_api_key

            user_key = await get_user_api_key(
                ctx.user_id,
                "gemini",
                redis_client=getattr(ctx, "redis", None),
                channel_id=getattr(ctx, "channel_id", None),
                config=getattr(ctx, "config", None),
            )
            if user_key:
                return user_key, True
        except Exception as exc:
            logger.warning("Failed to resolve user Gemini key: %s", exc)
    return FALLBACK_API_KEY, False


async def _call_gemini_native(
    prompt_parts: list[dict[str, Any]],
    api_key: str,
    aspect_ratio: str = "16:9",
    model: str | None = None,
) -> bytes | None:
    """Call the native Gemini image API and return the raw image bytes.

    Builds the ``generateContent`` request body around the supplied
    ``prompt_parts`` -- attaching the module's ``IMAGE_GENERATION_SYSTEM_PROMPT``
    as the system instruction and requesting both ``TEXT`` and ``IMAGE``
    modalities at the given aspect ratio -- then POSTs it to the Gemini endpoint
    over an ``httpx.AsyncClient`` (120s timeout). An unsupported aspect ratio is
    silently coerced to ``"16:9"`` and a missing model falls back to
    ``DEFAULT_IMAGE_MODEL``. Large response bodies (>= the
    ``_JSON_BODY_DECODE_THRESHOLD``) are parsed off the event loop via
    ``asyncio.to_thread(_json_loads_utf8, ...)``. The first inline image part is
    base64-decoded and returned; non-200 responses, empty candidates, and
    decode failures are logged and yield ``None``. This is a network call with
    no Redis, KG, or filesystem side effects.

    Called within this module by ``run``; also imported and invoked by
    ``tools/generate_background.py``, ``tools/edit_image.py``, and
    ``background_agents/game_art_agent.py`` to reuse the same image pipeline.

    Args:
        prompt_parts: The Gemini ``parts`` list (text and/or inline image
            dicts) forming the user content.
        api_key: The Gemini API key, sent as the ``x-goog-api-key`` header.
        aspect_ratio: Requested aspect ratio; coerced to ``"16:9"`` if it is
            not in ``SUPPORTED_ASPECT_RATIOS``.
        model: Optional model id; defaults to ``DEFAULT_IMAGE_MODEL`` when not
            given.

    Returns:
        The decoded image bytes from the first inline image part, or ``None``
        when the API errors, returns no candidates, or no image data is found.
    """
    import httpx

    if aspect_ratio not in SUPPORTED_ASPECT_RATIOS:
        aspect_ratio = "16:9"

    model = model or DEFAULT_IMAGE_MODEL
    url = f"{GEMINI_API_BASE}/models/{model}:generateContent"

    payload: dict[str, Any] = {
        "contents": [{"parts": prompt_parts}],
        "systemInstruction": {
            "parts": [{"text": IMAGE_GENERATION_SYSTEM_PROMPT}],
        },
        "generationConfig": {
            "responseModalities": ["TEXT", "IMAGE"],
            "imageConfig": {"aspectRatio": aspect_ratio},
        },
    }

    headers = {
        "x-goog-api-key": api_key,
        "Content-Type": "application/json",
    }

    async with httpx.AsyncClient(timeout=120.0) as http:
        resp = await http.post(url, headers=headers, json=payload)
        if resp.status_code != 200:
            logger.error(
                "Gemini API error: %d - %s",
                resp.status_code,
                resp.text[:500],
            )
            return None
        raw = await resp.aread()
        if len(raw) >= _JSON_BODY_DECODE_THRESHOLD:
            result = await asyncio.to_thread(_json_loads_utf8, raw)
        else:
            result = json.loads(raw.decode("utf-8"))

    # Parse native Gemini response: candidates[0].content.parts[]
    candidates = result.get("candidates", [])
    if not candidates:
        logger.warning("Gemini API returned no candidates")
        return None

    parts = candidates[0].get("content", {}).get("parts", [])
    for part in parts:
        inline_data = part.get("inlineData")
        if inline_data and inline_data.get("data"):
            try:
                return base64.b64decode(inline_data["data"])
            except Exception as exc:
                logger.error("Base64 decode failed: %s", exc)
    return None


[docs] async def run( prompt: str, aspect_ratio: str = "16:9", model: str | None = None, ctx: ToolContext | None = None, ) -> str: """Generate an image from a prompt and post it to the current channel. Entry point for the ``generate_image`` tool. It resolves a Gemini key with ``_resolve_api_key`` (preferring the user's own), and -- for users on the shared fallback key -- enforces the ``_IMAGE_DAILY_LIMIT`` daily quota via ``_default_key_image_quota_applies`` and ``check_default_key_limit`` (both backed by Redis through ``manage_api_keys``), returning the ``IMAGE_RATE_LIMIT_ERROR`` JSON when exhausted. It then calls ``_call_gemini_native`` to render the image, normalises the bytes to PNG via the nested ``_convert_to_png`` on ``asyncio.to_thread``, and uploads the result to the channel through ``ctx.adapter.send_file``, recording it in ``ctx.sent_files``. On a successful shared-key generation it bumps the usage counter with ``increment_default_key_usage``. Failures are caught and returned as a JSON ``error`` object. Dispatched by ``tool_loader.py``, which imports this module and resolves ``run`` via ``getattr(module, "run")`` to register it under ``TOOL_NAME`` ("generate_image"). Args: prompt: Text description of the image to generate. aspect_ratio: Requested aspect ratio (default ``"16:9"``), passed through to ``_call_gemini_native``. model: Optional Gemini image model id; defaults are applied downstream. ctx: The current ``ToolContext``; its ``adapter`` is required to send the image and its ``user_id`` / ``redis`` drive quota enforcement. When ``ctx`` or ``ctx.adapter`` is missing the call returns an error. Returns: A JSON string with ``success: True`` plus ``filename``, a ``result`` message, and (when available) ``file_url`` on success; otherwise a JSON object with an ``error`` describing a quota block, an empty result, or a raised exception. """ if ctx is None or ctx.adapter is None: return "Error: No platform adapter available." api_key, using_own_key = await _resolve_api_key(ctx) # Rate-limit default/fallback key users (exempt: admin, BYPASS_RATELIMIT, own key) if await _default_key_image_quota_applies(ctx, using_own_key): redis = getattr(ctx, "redis", None) from tools.manage_api_keys import check_default_key_limit allowed, current, limit = await check_default_key_limit( ctx.user_id, "image_generation", redis, daily_limit=_IMAGE_DAILY_LIMIT, ) if not allowed: return json.dumps( { "error": IMAGE_RATE_LIMIT_ERROR.format( current=current, limit=limit, ), } ) prompt_parts = [{"text": prompt}] try: img_bytes = await _call_gemini_native( prompt_parts, api_key, aspect_ratio, model, ) if not img_bytes: return json.dumps( { "error": "No image was generated by the model.", } ) from PIL import Image def _convert_to_png(data: bytes) -> bytes: """Re-encode arbitrary image bytes as PNG. Opens ``data`` with Pillow over an in-memory ``BytesIO`` and re-saves it to a fresh ``BytesIO`` in PNG format, normalising whatever container Gemini returned into the PNG the rest of ``run`` expects for hashing and channel upload. It is a synchronous, CPU-bound closure over ``run`` with no I/O, network, Redis, or KG side effects, and is invoked off the event loop via ``asyncio.to_thread``. Args: data: Raw image bytes from the Gemini model, in any format Pillow can decode. Returns: The same image re-encoded as PNG bytes. """ img = Image.open(BytesIO(data)) buf = BytesIO() img.save(buf, format="PNG") return buf.getvalue() png_bytes = await asyncio.to_thread(_convert_to_png, img_bytes) h = hashlib.sha256(png_bytes).hexdigest()[:16] fname = f"generated_{h}.png" file_url = await ctx.adapter.send_file( ctx.channel_id, png_bytes, fname, "image/png", ) ctx.sent_files.append( { "data": png_bytes, "filename": fname, "mimetype": "image/png", "file_url": file_url or "", } ) if await _default_key_image_quota_applies(ctx, using_own_key): redis = getattr(ctx, "redis", None) from tools.manage_api_keys import increment_default_key_usage await increment_default_key_usage( ctx.user_id, "image_generation", redis, ) result: dict[str, Any] = { "success": True, "filename": fname, "result": "Image generated and sent to the channel.", } if file_url: result["file_url"] = file_url return json.dumps(result) except Exception as exc: logger.error( "Image generation error: %s", exc, exc_info=True, ) return json.dumps({"error": f"Image generation failed: {exc}"})