Source code for tools.grok_imagine

"""GameGirl Color -- Asset animation tool via Grok Imagine.

Takes an existing game asset (image URL) and animates it
into a short video using xAI's Grok Imagine img2vid API.
The result is sent as an animated WebP or MP4.
# 🌀💀 CORRUPTED ANIMATION PIPELINE
"""

from __future__ import annotations

import asyncio
import base64
import jsonutil as json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, TYPE_CHECKING

import aiohttp
import httpx

from tools._safe_http import (
    assert_safe_http_url,
    safe_http_request,
    safe_httpx_client,
)

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# xAI Grok Imagine API  # 🕷️
_XAI_API_BASE = "https://api.x.ai/v1"
_GENERATE_URL = f"{_XAI_API_BASE}/images/generations"
_POLL_URL = f"{_XAI_API_BASE}/images/generations/{{request_id}}"
_MODEL = "grok-imagine-video"

# Polling config
_POLL_INTERVAL = 3.0  # seconds between polls
_POLL_TIMEOUT = 120.0  # max wait time

TOOL_NAME = "animate_asset"
TOOL_DESCRIPTION = (
    "Animate a game asset (still image) into a short video using "
    "xAI's Grok Imagine image-to-video API. Takes an existing game "
    "asset by name or URL and converts it into a dynamic animated "
    "clip (6-15 seconds). The result is sent to the current channel. "
    "Optionally provide a motion prompt to guide the animation style."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "asset_name": {
            "type": "string",
            "description": (
                "Name of a saved game asset to animate, OR a direct "
                "image URL. If a name is given, looks up the URL from "
                "the game's asset registry."
            ),
        },
        "prompt": {
            "type": "string",
            "description": (
                "Optional motion prompt describing how the image should "
                "animate. E.g. 'gentle swaying motion, particles floating "
                "upward, dramatic lighting shift'. If omitted, the AI "
                "generates motion automatically."
            ),
        },
        "save_as": {
            "type": "string",
            "description": (
                "Optional name to save the animated result as a new " "game asset."
            ),
        },
        "output_format": {
            "type": "string",
            "description": (
                "Output format: 'webp' (default, animated, embeds "
                "inline in Discord), 'mp4' (video file), or 'gif' "
                "(legacy animated). WebP is recommended for quality "
                "and file size."
            ),
            "enum": ["webp", "mp4", "gif"],
        },
    },
    "required": ["asset_name"],
}


async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
    """Resolve an xAI API key, preferring the caller's own key.

    Picks the credential used for the Grok Imagine request in priority order:
    the invoking user's stored xAI key (looked up via
    ``tools.manage_api_keys.get_user_api_key``, which reads Redis), then the
    shared key from ``ctx.config.api_keys["xai"]``, then the ``XAI_API_KEY``
    environment variable. The second tuple element flags whether the user's own
    key was used, which lets the caller attribute usage and billing correctly.

    Called only by ``run`` in this module to authorize the img2vid request;
    there are no external callers.

    Args:
        ctx: Tool execution context; supplies ``user_id``, ``redis``, and
            ``config`` for the key lookups. May be ``None``, in which case only
            the environment fallback applies.

    Returns:
        tuple[str, bool]: ``(api_key, using_own_key)``. ``api_key`` is empty if
        no key could be resolved; ``using_own_key`` is ``True`` only when the
        user's personal key was found.
    """
    if ctx is not None and getattr(ctx, "user_id", None):
        try:
            from tools.manage_api_keys import get_user_api_key

            user_key = await get_user_api_key(
                ctx.user_id,
                "xai",
                redis_client=getattr(ctx, "redis", None),
                channel_id=getattr(ctx, "channel_id", None),
                config=getattr(ctx, "config", None),
            )
            if user_key:
                return user_key, True
        except Exception as exc:
            logger.warning("Failed to resolve user xAI key: %s", exc)

    # Fallback to config / environment
    if ctx is not None:
        config = getattr(ctx, "config", None)
        if config is not None:
            api_keys = getattr(config, "api_keys", {})
            key = api_keys.get("xai", "")
            if key:
                return key, False
    env_key = os.environ.get("XAI_API_KEY", "")
    return env_key, False


async def _resolve_image_url(
    asset_name: str,
    channel_id: str,
    redis: Any = None,
) -> str | None:
    """Resolve a game-asset name (or pass-through URL) to an image URL.

    Turns the tool's ``asset_name`` argument into a concrete image URL to
    animate. A value that already looks like an ``http(s)`` URL is returned
    as-is; otherwise it is treated as the name of a saved game asset and looked
    up against the active game session for this channel
    (``game_session.get_session`` plus ``game_assets.get_asset_by_name``, which
    reads from Redis). If the game modules are unavailable or no session/asset
    matches, it returns ``None``.

    Called only by ``run`` in this module; there are no external callers.

    Args:
        asset_name: Either a direct image URL or the name of a saved game asset.
        channel_id: Channel used to locate the active game session.
        redis: Async Redis client used by the asset lookup; when absent, no
            asset resolution is attempted.

    Returns:
        The resolved image URL, or ``None`` if it could not be determined.
    """
    # Direct URL
    if asset_name.startswith("http://") or asset_name.startswith("https://"):
        return asset_name

    # Look up from game assets  # 💀
    try:
        from game_session import get_session
        from game_assets import get_asset_by_name

        session = get_session(channel_id)
        if session and session.active and redis:
            asset = await get_asset_by_name(
                session.game_id,
                asset_name,
                redis=redis,
            )
            if asset:
                return asset.url
    except ImportError:
        pass

    return None


async def _download_as_base64(url: str) -> str | None:
    """Download an image and return it as a base64 data URI.

    Fetches the source image so it can be embedded inline in the Grok Imagine
    request body (the API takes the seed frame as a data URI rather than a bare
    URL). The download goes through the SSRF-guarded HTTP helpers
    (``safe_httpx_client`` plus ``safe_http_request`` from ``tools._safe_http``),
    so blocked or internal hosts are rejected. The response content type drives
    the data-URI media type, defaulting to ``image/png``.

    Called only by ``run`` in this module; there are no external callers.

    Args:
        url: The image URL to download; whitespace is stripped before fetching.

    Returns:
        A ``data:<mime>;base64,<...>`` string on success, or ``None`` if the URL
        was blocked, the status was non-200, or the download failed.
    """
    try:
        async with safe_httpx_client(timeout=httpx.Timeout(30.0)) as client:
            resp = await safe_http_request(client, "GET", url.strip(), max_redirects=5)
            if resp.status_code != 200:
                return None
            data = resp.content
            ct = (resp.headers.get("content-type") or "image/png").split(";")[0].strip()
            b64 = base64.b64encode(data).decode("ascii")
            return f"data:{ct};base64,{b64}"
    except ValueError as exc:
        logger.warning("Blocked Grok image URL: %s", exc)
        return None
    except Exception as exc:
        logger.warning("Failed to download image for animation: %s", exc)
        return None


async def _convert_format(
    data: bytes,
    source_ext: str,
    target_ext: str,
) -> bytes | None:
    """Transcode animation bytes between formats using ffmpeg.

    Bridges the format the Grok Imagine API returns and the ``output_format`` the
    user asked for (webp, mp4, or gif). It shells out to ``ffmpeg`` via
    ``asyncio.create_subprocess_exec`` inside a temporary directory, applying
    libwebp settings for webp and an fps/scale filter for gif; mp4 uses ffmpeg's
    default codec. If the source and target already match it returns the input
    untouched, and a missing ffmpeg or any failure yields ``None`` so the caller
    can fall back to sending the original bytes.

    Called only by ``run`` in this module; there are no external callers.

    Args:
        data: The raw animation/video bytes to convert.
        source_ext: Extension describing the input bytes (e.g. ``"mp4"``).
        target_ext: Desired output extension (``"webp"``, ``"mp4"``, ``"gif"``).

    Returns:
        The converted bytes, the original ``data`` when no conversion is needed,
        or ``None`` if ffmpeg is unavailable or the conversion failed.
    """
    if source_ext == target_ext:
        return data

    try:
        with tempfile.TemporaryDirectory() as tmpdir:
            src = Path(tmpdir) / f"input.{source_ext}"
            dst = Path(tmpdir) / f"output.{target_ext}"
            src.write_bytes(data)

            cmd = ["ffmpeg", "-y", "-i", str(src)]
            if target_ext == "webp":
                cmd += [
                    "-vcodec",
                    "libwebp",
                    "-lossless",
                    "0",
                    "-quality",
                    "75",
                    "-loop",
                    "0",
                ]
            elif target_ext == "gif":
                cmd += ["-vf", "fps=15,scale=480:-1:flags=lanczos"]
            # mp4 default codec is fine
            cmd.append(str(dst))

            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.PIPE,
            )
            _, stderr = await asyncio.wait_for(proc.communicate(), timeout=60)
            if proc.returncode != 0:
                logger.warning("ffmpeg conversion failed: %s", stderr.decode()[:300])
                return None
            return dst.read_bytes()
    except FileNotFoundError:
        logger.warning("ffmpeg not found -- returning original format")
        return None
    except Exception as exc:
        logger.warning("Format conversion failed: %s", exc)
        return None


[docs] async def run( asset_name: str, prompt: str = "", save_as: str = "", output_format: str = "webp", ctx: ToolContext | None = None, ) -> str: """Animate a still game asset into a short clip via Grok Imagine img2vid. Entry point for the ``animate_asset`` tool. It resolves the seed image (``_resolve_image_url``), downloads and base64-encodes it (``_download_as_base64``), authorizes with an xAI key (``_resolve_api_key``), then calls xAI's Grok Imagine image-to-video endpoint and polls for completion. The finished clip is downloaded through the SSRF-guarded HTTP helpers, optionally transcoded to the requested format (``_convert_format``, ffmpeg), and delivered to the channel via ``ctx.adapter.send_file`` with a record appended to ``ctx.sent_files``. When ``save_as`` is given and a game session is active, the result is also registered as a new game asset (``game_assets.upload_asset``, persisted via Redis). Dispatched by the tool runner in ``tools/__init__.py``, which calls this module's ``run`` (``tool_def.handler(**arguments, ctx=ctx)``) for the registered ``animate_asset`` tool; there are no direct internal callers. Args: asset_name: Name of a saved game asset to animate, or a direct image URL. prompt: Optional motion prompt guiding the animation; a sensible default is used when empty. save_as: Optional name under which to register the result as a new game asset. output_format: Desired output container (``"webp"``, ``"mp4"``, or ``"gif"``); invalid values fall back to ``"webp"``. ctx: Tool execution context; supplies the channel id, Redis client, config, user id, outbound adapter, and ``sent_files`` list. Returns: str: JSON. On success an object with ``success``, the source name, the animated URL, the sent filename, and ``saved_as``; on failure an ``{"error": ...}`` object describing what went wrong. """ output_format = output_format.lower().strip() if output_format not in ("webp", "mp4", "gif"): output_format = "webp" api_key, _using_own_key = await _resolve_api_key(ctx) if not api_key: return json.dumps( { "error": "No xAI API key available. " "Provide your own key via: set_user_api_key " "service=xai api_key=YOUR_KEY", } ) if ctx is None: return json.dumps({"error": "No tool context available."}) channel_id = str(ctx.channel_id) redis = getattr(ctx, "redis", None) # Resolve the image URL # 🌀 image_url = await _resolve_image_url(asset_name, channel_id, redis) if not image_url: return json.dumps( { "error": f"Could not find asset '{asset_name}'. " "Provide a valid asset name or direct image URL.", } ) # Download and encode as base64 for the API # 💀 image_data_uri = await _download_as_base64(image_url) if not image_data_uri: return json.dumps( { "error": f"Failed to download image from '{image_url}'.", } ) headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } # Step 1: Initiate video generation # 🔥 payload: dict[str, Any] = { "model": _MODEL, "prompt": prompt or "Subtle, natural animation with smooth motion", "image": image_data_uri, } try: async with aiohttp.ClientSession() as session: async with session.post( _GENERATE_URL, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status not in (200, 201, 202): error_text = await resp.text() return json.dumps( { "error": f"Grok Imagine API error ({resp.status}): " f"{error_text[:500]}", } ) init_data = await resp.json() except Exception as exc: return json.dumps({"error": f"Animation request failed: {exc}"}) # Check if we got a direct result or need to poll # 🎮 request_id = init_data.get("request_id", "") video_url = None # Direct result (some endpoints return immediately) if "data" in init_data: items = init_data["data"] if items and isinstance(items, list): video_url = items[0].get("url", "") # Step 2: Poll for completion # 🕷️ if not video_url and request_id: poll_url = _POLL_URL.format(request_id=request_id) elapsed = 0.0 while elapsed < _POLL_TIMEOUT: await asyncio.sleep(_POLL_INTERVAL) elapsed += _POLL_INTERVAL try: async with aiohttp.ClientSession() as session: async with session.get( poll_url, headers=headers, timeout=aiohttp.ClientTimeout(total=15), ) as resp: if resp.status != 200: continue poll_data = await resp.json() status = poll_data.get("status", "") if status == "completed": items = poll_data.get("data", []) if items: video_url = items[0].get("url", "") break elif status == "failed": error = poll_data.get("error", "Unknown") return json.dumps( { "error": f"Animation failed: {error}", } ) # else: still processing, keep polling except Exception: continue if not video_url: return json.dumps( { "error": "Animation timed out or no result returned.", } ) # Download and send the result # 💀 try: safe_video = assert_safe_http_url(str(video_url).strip()) except ValueError as exc: return json.dumps({"error": f"Result URL blocked: {exc}"}) try: async with safe_httpx_client(timeout=httpx.Timeout(60.0)) as client: resp = await safe_http_request( client, "GET", safe_video, max_redirects=5 ) if resp.status_code != 200: return json.dumps( { "error": f"Failed to download animated result " f"({resp.status_code}).", } ) video_data = resp.content ct = (resp.headers.get("content-type") or "video/mp4").split(";")[0].strip() # Detect source format from content type # 🌀 source_ext = "mp4" if "webp" in ct: source_ext = "webp" elif "gif" in ct: source_ext = "gif" # Convert to requested format if different # 🔥 final_data = video_data final_ext = source_ext if output_format != source_ext: converted = await _convert_format( video_data, source_ext, output_format, ) if converted: final_data = converted final_ext = output_format else: logger.info( "Conversion to %s failed, sending as %s", output_format, source_ext, ) # Content type mapping ct_map = { "webp": "image/webp", "mp4": "video/mp4", "gif": "image/gif", } final_ct = ct_map.get(final_ext, "video/mp4") safe_name = (save_as or asset_name)[:30].replace(" ", "_") fname = f"animated_{safe_name}.{final_ext}" file_url = await ctx.adapter.send_file( channel_id, final_data, fname, final_ct, ) ctx.sent_files.append( { "data": final_data, "filename": fname, "mimetype": final_ct, "file_url": file_url or "", } ) except Exception as exc: return json.dumps({"error": f"Failed to send animation: {exc}"}) result_format = final_ext # noqa: F841 -- used in return # Save as asset if requested # 💾 if save_as: try: from game_session import get_session from game_assets import upload_asset session_obj = get_session(channel_id) if session_obj and session_obj.active: user_id = str(getattr(ctx, "user_id", "")) await upload_asset( game_id=session_obj.game_id, name=save_as, category="special", url=video_url, user_id=user_id, turn=session_obj.turn_number, redis=redis, ) except Exception as exc: logger.warning("Failed to save animated asset: %s", exc) result_info: dict = { "success": True, "source": asset_name, "animated_url": video_url, "filename": fname, "saved_as": save_as if save_as else None, } if file_url: result_info["file_url"] = file_url return json.dumps(result_info)