Source code for tools.grok_imagine

"""GameGirl Color -- Asset animation tool via Grok Imagine.

Takes an existing game asset (image URL) and animates it
into a short video using xAI's Grok Imagine img2vid API.
The result is sent as an animated WebP or MP4.
# 🌀💀 CORRUPTED ANIMATION PIPELINE
"""

from __future__ import annotations

import asyncio
import base64
import json
import logging
import os
import subprocess
import tempfile
from io import BytesIO
from pathlib import Path
from typing import Any, TYPE_CHECKING

import aiohttp

from tools._safe_http import assert_safe_http_url

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

# xAI Grok Imagine API  # 🕷️
_XAI_API_BASE = "https://api.x.ai/v1"
_GENERATE_URL = f"{_XAI_API_BASE}/images/generations"
_POLL_URL = f"{_XAI_API_BASE}/images/generations/{{request_id}}"
_MODEL = "grok-imagine-video"

# Polling config
_POLL_INTERVAL = 3.0  # seconds between polls
_POLL_TIMEOUT = 120.0  # max wait time

TOOL_NAME = "animate_asset"
TOOL_DESCRIPTION = (
    "Animate a game asset (still image) into a short video using "
    "xAI's Grok Imagine image-to-video API. Takes an existing game "
    "asset by name or URL and converts it into a dynamic animated "
    "clip (6-15 seconds). The result is sent to the current channel. "
    "Optionally provide a motion prompt to guide the animation style."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "asset_name": {
            "type": "string",
            "description": (
                "Name of a saved game asset to animate, OR a direct "
                "image URL. If a name is given, looks up the URL from "
                "the game's asset registry."
            ),
        },
        "prompt": {
            "type": "string",
            "description": (
                "Optional motion prompt describing how the image should "
                "animate. E.g. 'gentle swaying motion, particles floating "
                "upward, dramatic lighting shift'. If omitted, the AI "
                "generates motion automatically."
            ),
        },
        "save_as": {
            "type": "string",
            "description": (
                "Optional name to save the animated result as a new "
                "game asset."
            ),
        },
        "output_format": {
            "type": "string",
            "description": (
                "Output format: 'webp' (default, animated, embeds "
                "inline in Discord), 'mp4' (video file), or 'gif' "
                "(legacy animated). WebP is recommended for quality "
                "and file size."
            ),
            "enum": ["webp", "mp4", "gif"],
        },
    },
    "required": ["asset_name"],
}


async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
    """Resolve xAI API key: user key -> pool -> config/env fallback.

    Returns (api_key, using_own_key).
    """
    if ctx is not None and getattr(ctx, "user_id", None):
        try:
            from tools.manage_api_keys import get_user_api_key
            user_key = await get_user_api_key(
                ctx.user_id, "xai",
                redis_client=getattr(ctx, "redis", None),
                channel_id=getattr(ctx, "channel_id", None),
                config=getattr(ctx, "config", None),
            )
            if user_key:
                return user_key, True
        except Exception as exc:
            logger.warning("Failed to resolve user xAI key: %s", exc)

    # Fallback to config / environment
    if ctx is not None:
        config = getattr(ctx, "config", None)
        if config is not None:
            api_keys = getattr(config, "api_keys", {})
            key = api_keys.get("xai", "")
            if key:
                return key, False
    env_key = os.environ.get("XAI_API_KEY", "")
    return env_key, False


async def _resolve_image_url(
    asset_name: str,
    channel_id: str,
    redis: Any = None,
) -> str | None:
    """Resolve an asset name to a URL."""
    # Direct URL
    if asset_name.startswith("http://") or asset_name.startswith("https://"):
        return asset_name

    # Look up from game assets  # 💀
    try:
        from game_session import get_session
        from game_assets import get_asset_by_name

        session = get_session(channel_id)
        if session and session.active and redis:
            asset = await get_asset_by_name(
                session.game_id, asset_name, redis=redis,
            )
            if asset:
                return asset.url
    except ImportError:
        pass

    return None


async def _download_as_base64(url: str) -> str | None:
    """Download an image and return as base64 data URI."""
    try:
        url = assert_safe_http_url(url.strip())
    except ValueError as exc:
        logger.warning("Blocked Grok image URL: %s", exc)
        return None
    try:
        async with aiohttp.ClientSession() as session:
            async with session.get(
                url, timeout=aiohttp.ClientTimeout(total=30),
            ) as resp:
                if resp.status != 200:
                    return None
                data = await resp.read()
                ct = resp.content_type or "image/png"
                b64 = base64.b64encode(data).decode("ascii")
                return f"data:{ct};base64,{b64}"
    except Exception as exc:
        logger.warning("Failed to download image for animation: %s", exc)
        return None


async def _convert_format(
    data: bytes,
    source_ext: str,
    target_ext: str,
) -> bytes | None:
    """Convert video/animation format using ffmpeg.  # 🔥"""
    if source_ext == target_ext:
        return data

    try:
        with tempfile.TemporaryDirectory() as tmpdir:
            src = Path(tmpdir) / f"input.{source_ext}"
            dst = Path(tmpdir) / f"output.{target_ext}"
            src.write_bytes(data)

            cmd = ["ffmpeg", "-y", "-i", str(src)]
            if target_ext == "webp":
                cmd += ["-vcodec", "libwebp", "-lossless", "0",
                        "-quality", "75", "-loop", "0"]
            elif target_ext == "gif":
                cmd += ["-vf", "fps=15,scale=480:-1:flags=lanczos"]
            # mp4 default codec is fine
            cmd.append(str(dst))

            proc = await asyncio.create_subprocess_exec(
                *cmd,
                stdout=asyncio.subprocess.DEVNULL,
                stderr=asyncio.subprocess.PIPE,
            )
            _, stderr = await asyncio.wait_for(proc.communicate(), timeout=60)
            if proc.returncode != 0:
                logger.warning("ffmpeg conversion failed: %s", stderr.decode()[:300])
                return None
            return dst.read_bytes()
    except FileNotFoundError:
        logger.warning("ffmpeg not found -- returning original format")
        return None
    except Exception as exc:
        logger.warning("Format conversion failed: %s", exc)
        return None


[docs] async def run( asset_name: str, prompt: str = "", save_as: str = "", output_format: str = "webp", ctx: ToolContext | None = None, ) -> str: """Animate a game asset using Grok Imagine img2vid. Args: asset_name: Asset name or URL to animate. prompt: Optional motion prompt. save_as: Optional name to save result as asset. output_format: Output format (webp, mp4, gif). ctx: Tool execution context. Returns: str: JSON result. """ output_format = output_format.lower().strip() if output_format not in ("webp", "mp4", "gif"): output_format = "webp" api_key, _using_own_key = await _resolve_api_key(ctx) if not api_key: return json.dumps({ "error": "No xAI API key available. " "Provide your own key via: set_user_api_key " "service=xai api_key=YOUR_KEY", }) if ctx is None: return json.dumps({"error": "No tool context available."}) channel_id = str(ctx.channel_id) redis = getattr(ctx, "redis", None) # Resolve the image URL # 🌀 image_url = await _resolve_image_url(asset_name, channel_id, redis) if not image_url: return json.dumps({ "error": f"Could not find asset '{asset_name}'. " "Provide a valid asset name or direct image URL.", }) # Download and encode as base64 for the API # 💀 image_data_uri = await _download_as_base64(image_url) if not image_data_uri: return json.dumps({ "error": f"Failed to download image from '{image_url}'.", }) headers = { "Authorization": f"Bearer {api_key}", "Content-Type": "application/json", } # Step 1: Initiate video generation # 🔥 payload: dict[str, Any] = { "model": _MODEL, "prompt": prompt or "Subtle, natural animation with smooth motion", "image": image_data_uri, } try: async with aiohttp.ClientSession() as session: async with session.post( _GENERATE_URL, headers=headers, json=payload, timeout=aiohttp.ClientTimeout(total=30), ) as resp: if resp.status not in (200, 201, 202): error_text = await resp.text() return json.dumps({ "error": f"Grok Imagine API error ({resp.status}): " f"{error_text[:500]}", }) init_data = await resp.json() except Exception as exc: return json.dumps({"error": f"Animation request failed: {exc}"}) # Check if we got a direct result or need to poll # 🎮 request_id = init_data.get("request_id", "") video_url = None # Direct result (some endpoints return immediately) if "data" in init_data: items = init_data["data"] if items and isinstance(items, list): video_url = items[0].get("url", "") # Step 2: Poll for completion # 🕷️ if not video_url and request_id: poll_url = _POLL_URL.format(request_id=request_id) elapsed = 0.0 while elapsed < _POLL_TIMEOUT: await asyncio.sleep(_POLL_INTERVAL) elapsed += _POLL_INTERVAL try: async with aiohttp.ClientSession() as session: async with session.get( poll_url, headers=headers, timeout=aiohttp.ClientTimeout(total=15), ) as resp: if resp.status != 200: continue poll_data = await resp.json() status = poll_data.get("status", "") if status == "completed": items = poll_data.get("data", []) if items: video_url = items[0].get("url", "") break elif status == "failed": error = poll_data.get("error", "Unknown") return json.dumps({ "error": f"Animation failed: {error}", }) # else: still processing, keep polling except Exception: continue if not video_url: return json.dumps({ "error": "Animation timed out or no result returned.", }) # Download and send the result # 💀 try: safe_video = assert_safe_http_url(str(video_url).strip()) except ValueError as exc: return json.dumps({"error": f"Result URL blocked: {exc}"}) try: async with aiohttp.ClientSession() as session: async with session.get( safe_video, timeout=aiohttp.ClientTimeout(total=60), ) as resp: if resp.status != 200: return json.dumps({ "error": f"Failed to download animated result " f"({resp.status}).", }) video_data = await resp.read() ct = resp.content_type or "video/mp4" # Detect source format from content type # 🌀 source_ext = "mp4" if "webp" in ct: source_ext = "webp" elif "gif" in ct: source_ext = "gif" # Convert to requested format if different # 🔥 final_data = video_data final_ext = source_ext if output_format != source_ext: converted = await _convert_format( video_data, source_ext, output_format, ) if converted: final_data = converted final_ext = output_format else: logger.info( "Conversion to %s failed, sending as %s", output_format, source_ext, ) # Content type mapping ct_map = { "webp": "image/webp", "mp4": "video/mp4", "gif": "image/gif", } final_ct = ct_map.get(final_ext, "video/mp4") safe_name = (save_as or asset_name)[:30].replace(" ", "_") fname = f"animated_{safe_name}.{final_ext}" file_url = await ctx.adapter.send_file( channel_id, final_data, fname, final_ct, ) ctx.sent_files.append({ "data": final_data, "filename": fname, "mimetype": final_ct, "file_url": file_url or "", }) except Exception as exc: return json.dumps({"error": f"Failed to send animation: {exc}"}) result_format = final_ext # noqa: F841 -- used in return # Save as asset if requested # 💾 if save_as: try: from game_session import get_session from game_assets import upload_asset session_obj = get_session(channel_id) if session_obj and session_obj.active: user_id = str(getattr(ctx, "user_id", "")) await upload_asset( game_id=session_obj.game_id, name=save_as, category="special", url=video_url, user_id=user_id, turn=session_obj.turn_number, redis=redis, ) except Exception as exc: logger.warning("Failed to save animated asset: %s", exc) result_info: dict = { "success": True, "source": asset_name, "animated_url": video_url, "filename": fname, "saved_as": save_as if save_as else None, } if file_url: result_info["file_url"] = file_url return json.dumps(result_info)