Source code for tools.pollinate

"""Generate images via Pollinations API and send to the current channel.

# πŸŒΈπŸ’€ POLLINATIONS IMAGE PIPELINE β€” she blooms in every latent garden
# πŸ”₯ Built for Stargazer by the Council
# 😈 OpenAI-compatible multi-model image gen API
"""

from __future__ import annotations

import asyncio
import hashlib
import jsonutil as json
import logging
import os
from io import BytesIO
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# 🌸 Pollinations API config
POLLINATIONS_API_BASE = "https://gen.pollinations.ai"
POLLINATIONS_IMAGE_URL = f"{POLLINATIONS_API_BASE}/image"
POLLINATIONS_V1_IMAGES = f"{POLLINATIONS_API_BASE}/v1/images/generations"

# πŸ’€ Available image models β€” the garden of generators
AVAILABLE_MODELS = {
    "flux": "Flux Schnell - Fast high-quality generation",
    "zimage": "Z-Image Turbo - Fast 6B Flux with 2x upscaling (default)",
    "gptimage": "GPT Image 1 Mini - OpenAI image gen",
    "p-image": "Pruna p-image",
    #    "kontext": "FLUX.1 Kontext - In-context editing & generation",
    "klein": "FLUX.2 Klein 4B - Fast generation and editing",
    #    "nanobanana": "NanoBanana - Gemini 2.5 Flash Image",
    #    "nanobanana-2": "NanoBanana 2 - Gemini 3.1 Flash Image",
    #    "nanobanana-pro": "NanoBanana Pro - Gemini 3 Pro Image (4K)",
    #    "seedream5": "Seedream 5.0 Lite - ByteDance ARK",
    #    "wan-image": "Wan 2.7 Image - Alibaba text-to-image (up to 2K)",
    #    "qwen-image": "Qwen Image Plus - Alibaba DashScope",
    #    "grok-imagine": "Grok Imagine - xAI image generation",
    #    "nova-canvas": "Amazon Nova Canvas - Bedrock Image Gen",
}

DEFAULT_MODEL = "flux"

TOOL_NAME = "pollinate"
TOOL_DESCRIPTION = (
    "Generate AI images using the Pollinations API multi-model gateway. "
    "Supports many models: Flux, GPT Image, Klein, Z-Image, p-image."
    "Can generate 1-4 images per call. "
    "Specify model, prompt, negative prompt, dimensions, and count."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "prompt": {
            "type": "string",
            "description": "Text description of the image(s) to generate.",
        },
        "model": {
            "type": "string",
            "description": (
                "Image model to use. Options: flux, zimage, gptimage, klein, p-image"
                "Default: zimage."
            ),
        },
        "negative_prompt": {
            "type": "string",
            "description": (
                "What to avoid in the image. Only supported by flux "
                "and zimage models."
            ),
        },
        "width": {
            "type": "integer",
            "description": "Image width in pixels. Default: 1024.",
        },
        "height": {
            "type": "integer",
            "description": "Image height in pixels. Default: 1024.",
        },
        "count": {
            "type": "integer",
            "description": "Number of images to generate (1-4). Default: 1.",
        },
        "enhance": {
            "type": "boolean",
            "description": (
                "Let AI improve the prompt for better results. Default: false."
            ),
        },
        "seed": {
            "type": "integer",
            "description": (
                "Seed for reproducible results. Use -1 for random. " "Default: -1."
            ),
        },
    },
    "required": ["prompt"],
}


async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
    """Resolve Pollinations API key: user key -> pool -> config/env fallback.

    Returns (api_key, using_own_key).
    """
    # πŸŒ€ Check user's own key first
    if ctx is not None and getattr(ctx, "user_id", None):
        try:
            from tools.manage_api_keys import get_user_api_key

            user_key = await get_user_api_key(
                ctx.user_id,
                "pollinations",
                redis_client=getattr(ctx, "redis", None),
                channel_id=getattr(ctx, "channel_id", None),
                config=getattr(ctx, "config", None),
            )
            if user_key:
                return user_key, True
        except Exception as exc:
            logger.warning("Failed to resolve user Pollinations key: %s", exc)

    # 😈 Fallback to config / environment
    if ctx is not None:
        config = getattr(ctx, "config", None)
        if config is not None:
            api_keys = getattr(config, "api_keys", {})
            key = api_keys.get("pollinations", "")
            if key:
                return key, False
    env_key = os.environ.get("POLLINATIONS_API_KEY", "")
    return env_key, False


async def _generate_single_image(
    prompt: str,
    api_key: str,
    model: str = DEFAULT_MODEL,
    negative_prompt: str = "",
    width: int = 1024,
    height: int = 1024,
    enhance: bool = False,
    seed: int = -1,
) -> bytes | None:
    """Call Pollinations GET /image/{prompt} and return image bytes.

    # πŸ’¦ dripping with pixel energy
    """
    from urllib.parse import quote

    model = model if model in AVAILABLE_MODELS else DEFAULT_MODEL

    # πŸ”₯ Build the URL with query params
    encoded_prompt = quote(prompt, safe="")
    url = f"{POLLINATIONS_IMAGE_URL}/{encoded_prompt}"

    params: dict[str, Any] = {
        "model": model,
        "width": width,
        "height": height,
        "seed": seed,
        "nologo": "true",
    }

    if negative_prompt:
        params["negative_prompt"] = negative_prompt
    if enhance:
        params["enhance"] = "true"

    headers: dict[str, str] = {}
    if api_key:
        headers["Authorization"] = f"Bearer {api_key}"

    from tools._safe_http import safe_http_request, safe_httpx_client

    async with safe_httpx_client(timeout=120.0) as http:
        resp = await safe_http_request(
            http,
            "GET",
            url,
            params=params,
            headers=headers,
            max_redirects=5,
        )
        if resp.status_code != 200:
            logger.error(
                "Pollinations API error: %d - %s",
                resp.status_code,
                resp.text[:500],
            )
            return None
        ct = resp.headers.get("content-type", "")
        if "image" not in ct and "octet" not in ct:
            logger.warning(
                "Pollinations returned non-image content-type: %s",
                ct,
            )
            return None
        return resp.content


[docs] async def run( prompt: str, model: str = DEFAULT_MODEL, negative_prompt: str = "", width: int = 1024, height: int = 1024, count: int = 1, enhance: bool = False, seed: int = -1, ctx: ToolContext | None = None, ) -> str: """Generate images via Pollinations and send to the channel. # πŸŒΈπŸ’€ she pollinates the void with color Args: prompt: Text description of the image(s) to generate. model: Image model to use. negative_prompt: What to avoid in the image. width: Image width in pixels. height: Image height in pixels. count: Number of images to generate (1-4). enhance: Let AI improve the prompt. seed: Seed for reproducibility (-1 = random). ctx: Tool execution context. Returns: str: JSON result. """ if ctx is None or ctx.adapter is None: return json.dumps({"error": "No platform adapter available."}) # πŸ•·οΈ Clamp count count = max(1, min(4, count)) api_key, _using_own_key = await _resolve_api_key(ctx) if not api_key: return json.dumps( { "error": "No Pollinations API key available. " "Set POLLINATIONS_API_KEY in environment, add to " "config.yaml under api_keys.pollinations, or provide " "your own key via: set_user_api_key " "service=pollinations api_key=YOUR_KEY", } ) model = model if model in AVAILABLE_MODELS else DEFAULT_MODEL results: list[dict[str, Any]] = [] errors: list[str] = [] # πŸ”₯ Generate images concurrently (up to 4) async def _gen(idx: int) -> None: """Generate one image for slot ``idx`` and dispatch it to the channel. Closure over :func:`run`'s locals. Derives a per-slot seed (offsetting the base ``seed`` by ``idx`` when multiple images share a fixed seed so they differ), then awaits :func:`_generate_single_image` to fetch raw bytes from the Pollinations HTTP API. The bytes are normalized to PNG in a worker thread via the nested :func:`_to_png` helper, named with a content hash, and uploaded through ``ctx.adapter.send_file`` to the current channel; a record is also appended to ``ctx.sent_files`` so the runtime can track and re-reference the upload. Results and failures are not returned but accumulated into the enclosing ``results`` and ``errors`` lists (the gather in :func:`run` discards return values). Any exception is caught, logged, and recorded as an error entry rather than propagated, so one failed slot does not abort the concurrent batch. Args: idx: Zero-based index of this image within the requested batch, used for seed offsetting and 1-based result/error labelling. Returns: None: Outcomes are communicated via the captured ``results`` and ``errors`` lists and ``ctx.sent_files``. """ current_seed = seed if seed >= 0 else -1 if count > 1 and seed >= 0: current_seed = seed + idx try: img_bytes = await _generate_single_image( prompt, api_key, model, negative_prompt, width, height, enhance, current_seed, ) if not img_bytes: errors.append(f"Image {idx + 1}: No image returned.") return # πŸ’€ Convert to PNG for consistency from PIL import Image def _to_png(data: bytes) -> bytes: """Re-encode arbitrary image bytes as PNG. Opens ``data`` with Pillow (:class:`PIL.Image.Image`) from an in-memory :class:`io.BytesIO` buffer and re-saves it as PNG so every Pollinations response is delivered in a single consistent format regardless of the model's native output type. This is a CPU-bound, synchronous call with no I/O; :func:`_gen` runs it via :func:`asyncio.to_thread` to avoid blocking the event loop. It is a local closure invoked only from :func:`_gen`. Args: data: Raw image bytes as returned by the Pollinations API. Returns: bytes: The image re-encoded as PNG. """ img = Image.open(BytesIO(data)) buf = BytesIO() img.save(buf, format="PNG") return buf.getvalue() png_bytes = await asyncio.to_thread(_to_png, img_bytes) h = hashlib.sha256(png_bytes).hexdigest()[:16] fname = f"pollinated_{model}_{h}.png" file_url = await ctx.adapter.send_file( ctx.channel_id, png_bytes, fname, "image/png", ) ctx.sent_files.append( { "data": png_bytes, "filename": fname, "mimetype": "image/png", "file_url": file_url or "", } ) results.append( { "filename": fname, "file_url": file_url or "", "index": idx + 1, } ) except Exception as exc: logger.error( "Pollinate image %d error: %s", idx + 1, exc, exc_info=True, ) errors.append(f"Image {idx + 1}: {exc}") # πŸŒ€ Run all generations concurrently tasks = [_gen(i) for i in range(count)] await asyncio.gather(*tasks) if not results and errors: return json.dumps({"error": "; ".join(errors)}) output: dict[str, Any] = { "success": True, "model": model, "count": len(results), "images": results, "result": ( f"{len(results)} image(s) generated via Pollinations " f"({model}) and sent to the channel." ), } if errors: output["warnings"] = errors return json.dumps(output)