"""Generate images via native Gemini API and send to the current channel."""
from __future__ import annotations
import asyncio
import base64
import hashlib
import jsonutil as json
import logging
from io import BytesIO
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# Native Gemini API endpoint
GEMINI_API_BASE = "https://generativelanguage.googleapis.com/v1beta"
DEFAULT_IMAGE_MODEL = "gemini-3.1-flash-image-preview"
FALLBACK_API_KEY = "AIzaSyCCwz9WCsIKSWsfufU6E-JbPsP1acLhZTU"
SUPPORTED_ASPECT_RATIOS = {
"1:1",
"2:3",
"3:2",
"3:4",
"4:3",
"4:5",
"5:4",
"9:16",
"16:9",
"21:9",
}
_JSON_BODY_DECODE_THRESHOLD = 256 * 1024
def _json_loads_utf8(body: bytes) -> Any:
"""Decode UTF-8 bytes and parse them as JSON.
A thin synchronous helper that UTF-8 decodes a raw response ``body`` and
parses it through the module's ``json`` facade (``jsonutil`` aliased as
``json``, which prefers a faster JSON backend when available). It exists so
that large response bodies can be parsed off the event loop: in
``_call_gemini_native`` this function is handed to ``asyncio.to_thread``
whenever the raw payload is at least ``_JSON_BODY_DECODE_THRESHOLD`` bytes,
keeping a big decode/parse from blocking the loop. It performs no I/O,
network, Redis, or KG access of its own.
The only internal caller found is ``_call_gemini_native`` in this module
(via ``asyncio.to_thread(_json_loads_utf8, raw)``). A separate identically
named helper lives in ``openrouter_client/transport.py``; this one is
local to image generation.
Args:
body: Raw response bytes encoded as UTF-8.
Returns:
The parsed JSON value (typically a ``dict`` for Gemini responses, but
any JSON-representable type).
Raises:
UnicodeDecodeError: If ``body`` is not valid UTF-8.
ValueError: If the decoded text is not valid JSON (the concrete error
type depends on the active JSON backend).
"""
return json.loads(body.decode("utf-8"))
IMAGE_GENERATION_SYSTEM_PROMPT = (
"You are an expert image generation artist specializing in "
"extremely high-quality, detailed artwork.\n\n"
"DEFAULT STYLE - ANIME/ILLUSTRATION:\n"
"By default, generate images in a premium anime/illustration "
"style with rich, vibrant color palettes, masterful lighting, "
"expressive character designs, beautiful backgrounds, smooth "
"gradients, and cinematic quality.\n\n"
"REALISM MODE:\n"
"When the user explicitly requests realistic or photorealistic "
"imagery, switch to hyperrealistic rendering with photographic "
"accuracy, natural lighting, and true-to-life textures.\n\n"
"QUALITY STANDARDS:\n"
"Always aim for the highest possible quality. Create visually "
"striking images with excellent composition, lighting, and "
"color harmony."
)
TOOL_NAME = "generate_image"
TOOL_DESCRIPTION = (
"Generate an AI image from a text prompt using Gemini and "
"send it to the current channel. Supports multiple "
"aspect ratios."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"prompt": {
"type": "string",
"description": ("Text description of the image to generate."),
},
"aspect_ratio": {
"type": "string",
"description": (
"Aspect ratio. Supported: 1:1, 2:3, 3:2, 3:4, "
"4:3, 4:5, 5:4, 9:16, 16:9, 21:9. "
"Default: 16:9."
),
},
"model": {
"type": "string",
"description": (
"Model name. Default: gemini-3.1-flash-image-preview. "
"Also available: gemini-3-pro-image-preview."
),
},
},
"required": ["prompt"],
}
IMAGE_RATE_LIMIT_ERROR = (
"User has reached their daily image generation limit ({current}/{limit}). "
"Image generation is expensive, and we can't subsidize this at scale "
"for everyone. To unlock unlimited image generation, provide your own "
"Gemini API key:\n"
"1. Get a key at: https://aistudio.google.com/apikey\n"
"2. Send it via DM: set_user_api_key service=gemini api_key=YOUR_KEY\n"
"Your own key has no daily limit."
)
_IMAGE_DAILY_LIMIT = 5
async def _default_key_image_quota_applies(
ctx: ToolContext | None,
using_own_key: bool,
) -> bool:
"""Decide whether the shared default-key daily image quota applies.
Thin pass-through to ``tools.manage_api_keys.default_key_limit_applies`` so
the exemption logic -- admins, the ``BYPASS_RATELIMIT`` privilege, and users
on their own Gemini key -- lives in exactly one place rather than being
duplicated here. That delegate reads privilege state from Redis via the
context, so this is an async, Redis-touching check despite its small size.
Called within this module by ``run`` both before generating (to gate the
request) and after a successful generation (to decide whether to increment
usage); ``tools/edit_image.py`` imports and uses it for the same purpose.
Args:
ctx: The current ``ToolContext`` (may be ``None``), used by the
delegate to look up the user's admin status and privileges.
using_own_key: ``True`` when the caller resolved to their own user
Gemini key, which exempts them from the shared quota.
Returns:
``True`` if the shared default-key daily quota should be enforced for
this call, ``False`` if the user is exempt.
"""
from tools.manage_api_keys import default_key_limit_applies
return await default_key_limit_applies(ctx, using_own_key=using_own_key)
async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
"""Resolve the Gemini API key to use, preferring the user's own.
Looks up a per-user Gemini key by calling
``tools.manage_api_keys.get_user_api_key`` (which reads the encrypted key
store from Redis, scoped by ``ctx.user_id`` and channel) and, on success,
reports that the user's own key is in use. When there is no context, no
user id, no stored key, or the lookup raises, it falls back to the shared
module-level ``FALLBACK_API_KEY`` and reports a non-own key, which is what
triggers daily-quota enforcement downstream. Exceptions from the lookup are
logged and swallowed rather than propagated.
Called within this module by ``run`` to obtain the key for
``_call_gemini_native``; it is also imported and reused by
``tools/generate_background.py`` and ``tools/edit_image.py``.
Args:
ctx: The current ``ToolContext`` (or ``None``); supplies ``user_id``,
``redis``, ``channel_id``, and ``config`` for the key lookup.
Returns:
A ``(api_key, using_own_key)`` tuple: the resolved key string, and a
boolean that is ``True`` only when the user's own stored key was used.
"""
if ctx is not None and getattr(ctx, "user_id", None):
try:
from tools.manage_api_keys import get_user_api_key
user_key = await get_user_api_key(
ctx.user_id,
"gemini",
redis_client=getattr(ctx, "redis", None),
channel_id=getattr(ctx, "channel_id", None),
config=getattr(ctx, "config", None),
)
if user_key:
return user_key, True
except Exception as exc:
logger.warning("Failed to resolve user Gemini key: %s", exc)
return FALLBACK_API_KEY, False
async def _call_gemini_native(
prompt_parts: list[dict[str, Any]],
api_key: str,
aspect_ratio: str = "16:9",
model: str | None = None,
) -> bytes | None:
"""Call the native Gemini image API and return the raw image bytes.
Builds the ``generateContent`` request body around the supplied
``prompt_parts`` -- attaching the module's ``IMAGE_GENERATION_SYSTEM_PROMPT``
as the system instruction and requesting both ``TEXT`` and ``IMAGE``
modalities at the given aspect ratio -- then POSTs it to the Gemini endpoint
over an ``httpx.AsyncClient`` (120s timeout). An unsupported aspect ratio is
silently coerced to ``"16:9"`` and a missing model falls back to
``DEFAULT_IMAGE_MODEL``. Large response bodies (>= the
``_JSON_BODY_DECODE_THRESHOLD``) are parsed off the event loop via
``asyncio.to_thread(_json_loads_utf8, ...)``. The first inline image part is
base64-decoded and returned; non-200 responses, empty candidates, and
decode failures are logged and yield ``None``. This is a network call with
no Redis, KG, or filesystem side effects.
Called within this module by ``run``; also imported and invoked by
``tools/generate_background.py``, ``tools/edit_image.py``, and
``background_agents/game_art_agent.py`` to reuse the same image pipeline.
Args:
prompt_parts: The Gemini ``parts`` list (text and/or inline image
dicts) forming the user content.
api_key: The Gemini API key, sent as the ``x-goog-api-key`` header.
aspect_ratio: Requested aspect ratio; coerced to ``"16:9"`` if it is
not in ``SUPPORTED_ASPECT_RATIOS``.
model: Optional model id; defaults to ``DEFAULT_IMAGE_MODEL`` when not
given.
Returns:
The decoded image bytes from the first inline image part, or ``None``
when the API errors, returns no candidates, or no image data is found.
"""
import httpx
if aspect_ratio not in SUPPORTED_ASPECT_RATIOS:
aspect_ratio = "16:9"
model = model or DEFAULT_IMAGE_MODEL
url = f"{GEMINI_API_BASE}/models/{model}:generateContent"
payload: dict[str, Any] = {
"contents": [{"parts": prompt_parts}],
"systemInstruction": {
"parts": [{"text": IMAGE_GENERATION_SYSTEM_PROMPT}],
},
"generationConfig": {
"responseModalities": ["TEXT", "IMAGE"],
"imageConfig": {"aspectRatio": aspect_ratio},
},
}
headers = {
"x-goog-api-key": api_key,
"Content-Type": "application/json",
}
async with httpx.AsyncClient(timeout=120.0) as http:
resp = await http.post(url, headers=headers, json=payload)
if resp.status_code != 200:
logger.error(
"Gemini API error: %d - %s",
resp.status_code,
resp.text[:500],
)
return None
raw = await resp.aread()
if len(raw) >= _JSON_BODY_DECODE_THRESHOLD:
result = await asyncio.to_thread(_json_loads_utf8, raw)
else:
result = json.loads(raw.decode("utf-8"))
# Parse native Gemini response: candidates[0].content.parts[]
candidates = result.get("candidates", [])
if not candidates:
logger.warning("Gemini API returned no candidates")
return None
parts = candidates[0].get("content", {}).get("parts", [])
for part in parts:
inline_data = part.get("inlineData")
if inline_data and inline_data.get("data"):
try:
return base64.b64decode(inline_data["data"])
except Exception as exc:
logger.error("Base64 decode failed: %s", exc)
return None
[docs]
async def run(
prompt: str,
aspect_ratio: str = "16:9",
model: str | None = None,
ctx: ToolContext | None = None,
) -> str:
"""Generate an image from a prompt and post it to the current channel.
Entry point for the ``generate_image`` tool. It resolves a Gemini key with
``_resolve_api_key`` (preferring the user's own), and -- for users on the
shared fallback key -- enforces the ``_IMAGE_DAILY_LIMIT`` daily quota via
``_default_key_image_quota_applies`` and ``check_default_key_limit`` (both
backed by Redis through ``manage_api_keys``), returning the
``IMAGE_RATE_LIMIT_ERROR`` JSON when exhausted. It then calls
``_call_gemini_native`` to render the image, normalises the bytes to PNG via
the nested ``_convert_to_png`` on ``asyncio.to_thread``, and uploads the
result to the channel through ``ctx.adapter.send_file``, recording it in
``ctx.sent_files``. On a successful shared-key generation it bumps the usage
counter with ``increment_default_key_usage``. Failures are caught and
returned as a JSON ``error`` object.
Dispatched by ``tool_loader.py``, which imports this module and resolves
``run`` via ``getattr(module, "run")`` to register it under ``TOOL_NAME``
("generate_image").
Args:
prompt: Text description of the image to generate.
aspect_ratio: Requested aspect ratio (default ``"16:9"``), passed
through to ``_call_gemini_native``.
model: Optional Gemini image model id; defaults are applied downstream.
ctx: The current ``ToolContext``; its ``adapter`` is required to send
the image and its ``user_id`` / ``redis`` drive quota enforcement.
When ``ctx`` or ``ctx.adapter`` is missing the call returns an error.
Returns:
A JSON string with ``success: True`` plus ``filename``, a ``result``
message, and (when available) ``file_url`` on success; otherwise a JSON
object with an ``error`` describing a quota block, an empty result, or a
raised exception.
"""
if ctx is None or ctx.adapter is None:
return "Error: No platform adapter available."
api_key, using_own_key = await _resolve_api_key(ctx)
# Rate-limit default/fallback key users (exempt: admin, BYPASS_RATELIMIT, own key)
if await _default_key_image_quota_applies(ctx, using_own_key):
redis = getattr(ctx, "redis", None)
from tools.manage_api_keys import check_default_key_limit
allowed, current, limit = await check_default_key_limit(
ctx.user_id,
"image_generation",
redis,
daily_limit=_IMAGE_DAILY_LIMIT,
)
if not allowed:
return json.dumps(
{
"error": IMAGE_RATE_LIMIT_ERROR.format(
current=current,
limit=limit,
),
}
)
prompt_parts = [{"text": prompt}]
try:
img_bytes = await _call_gemini_native(
prompt_parts,
api_key,
aspect_ratio,
model,
)
if not img_bytes:
return json.dumps(
{
"error": "No image was generated by the model.",
}
)
from PIL import Image
def _convert_to_png(data: bytes) -> bytes:
"""Re-encode arbitrary image bytes as PNG.
Opens ``data`` with Pillow over an in-memory ``BytesIO`` and re-saves
it to a fresh ``BytesIO`` in PNG format, normalising whatever
container Gemini returned into the PNG the rest of ``run`` expects
for hashing and channel upload. It is a synchronous, CPU-bound
closure over ``run`` with no I/O, network, Redis, or KG side
effects, and is invoked off the event loop via ``asyncio.to_thread``.
Args:
data: Raw image bytes from the Gemini model, in any format
Pillow can decode.
Returns:
The same image re-encoded as PNG bytes.
"""
img = Image.open(BytesIO(data))
buf = BytesIO()
img.save(buf, format="PNG")
return buf.getvalue()
png_bytes = await asyncio.to_thread(_convert_to_png, img_bytes)
h = hashlib.sha256(png_bytes).hexdigest()[:16]
fname = f"generated_{h}.png"
file_url = await ctx.adapter.send_file(
ctx.channel_id,
png_bytes,
fname,
"image/png",
)
ctx.sent_files.append(
{
"data": png_bytes,
"filename": fname,
"mimetype": "image/png",
"file_url": file_url or "",
}
)
if await _default_key_image_quota_applies(ctx, using_own_key):
redis = getattr(ctx, "redis", None)
from tools.manage_api_keys import increment_default_key_usage
await increment_default_key_usage(
ctx.user_id,
"image_generation",
redis,
)
result: dict[str, Any] = {
"success": True,
"filename": fname,
"result": "Image generated and sent to the channel.",
}
if file_url:
result["file_url"] = file_url
return json.dumps(result)
except Exception as exc:
logger.error(
"Image generation error: %s",
exc,
exc_info=True,
)
return json.dumps({"error": f"Image generation failed: {exc}"})