"""GameGirl Color -- Asset animation tool via Grok Imagine.
Takes an existing game asset (image URL) and animates it
into a short video using xAI's Grok Imagine img2vid API.
The result is sent as an animated WebP or MP4.
# 🌀💀 CORRUPTED ANIMATION PIPELINE
"""
from __future__ import annotations
import asyncio
import base64
import jsonutil as json
import logging
import os
import tempfile
from pathlib import Path
from typing import Any, TYPE_CHECKING
import aiohttp
import httpx
from tools._safe_http import (
assert_safe_http_url,
safe_http_request,
safe_httpx_client,
)
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
# xAI Grok Imagine API # 🕷️
_XAI_API_BASE = "https://api.x.ai/v1"
_GENERATE_URL = f"{_XAI_API_BASE}/images/generations"
_POLL_URL = f"{_XAI_API_BASE}/images/generations/{{request_id}}"
_MODEL = "grok-imagine-video"
# Polling config
_POLL_INTERVAL = 3.0 # seconds between polls
_POLL_TIMEOUT = 120.0 # max wait time
TOOL_NAME = "animate_asset"
TOOL_DESCRIPTION = (
"Animate a game asset (still image) into a short video using "
"xAI's Grok Imagine image-to-video API. Takes an existing game "
"asset by name or URL and converts it into a dynamic animated "
"clip (6-15 seconds). The result is sent to the current channel. "
"Optionally provide a motion prompt to guide the animation style."
)
TOOL_PARAMETERS = {
"type": "object",
"properties": {
"asset_name": {
"type": "string",
"description": (
"Name of a saved game asset to animate, OR a direct "
"image URL. If a name is given, looks up the URL from "
"the game's asset registry."
),
},
"prompt": {
"type": "string",
"description": (
"Optional motion prompt describing how the image should "
"animate. E.g. 'gentle swaying motion, particles floating "
"upward, dramatic lighting shift'. If omitted, the AI "
"generates motion automatically."
),
},
"save_as": {
"type": "string",
"description": (
"Optional name to save the animated result as a new " "game asset."
),
},
"output_format": {
"type": "string",
"description": (
"Output format: 'webp' (default, animated, embeds "
"inline in Discord), 'mp4' (video file), or 'gif' "
"(legacy animated). WebP is recommended for quality "
"and file size."
),
"enum": ["webp", "mp4", "gif"],
},
},
"required": ["asset_name"],
}
async def _resolve_api_key(ctx: ToolContext | None) -> tuple[str, bool]:
"""Resolve an xAI API key, preferring the caller's own key.
Picks the credential used for the Grok Imagine request in priority order:
the invoking user's stored xAI key (looked up via
``tools.manage_api_keys.get_user_api_key``, which reads Redis), then the
shared key from ``ctx.config.api_keys["xai"]``, then the ``XAI_API_KEY``
environment variable. The second tuple element flags whether the user's own
key was used, which lets the caller attribute usage and billing correctly.
Called only by ``run`` in this module to authorize the img2vid request;
there are no external callers.
Args:
ctx: Tool execution context; supplies ``user_id``, ``redis``, and
``config`` for the key lookups. May be ``None``, in which case only
the environment fallback applies.
Returns:
tuple[str, bool]: ``(api_key, using_own_key)``. ``api_key`` is empty if
no key could be resolved; ``using_own_key`` is ``True`` only when the
user's personal key was found.
"""
if ctx is not None and getattr(ctx, "user_id", None):
try:
from tools.manage_api_keys import get_user_api_key
user_key = await get_user_api_key(
ctx.user_id,
"xai",
redis_client=getattr(ctx, "redis", None),
channel_id=getattr(ctx, "channel_id", None),
config=getattr(ctx, "config", None),
)
if user_key:
return user_key, True
except Exception as exc:
logger.warning("Failed to resolve user xAI key: %s", exc)
# Fallback to config / environment
if ctx is not None:
config = getattr(ctx, "config", None)
if config is not None:
api_keys = getattr(config, "api_keys", {})
key = api_keys.get("xai", "")
if key:
return key, False
env_key = os.environ.get("XAI_API_KEY", "")
return env_key, False
async def _resolve_image_url(
asset_name: str,
channel_id: str,
redis: Any = None,
) -> str | None:
"""Resolve a game-asset name (or pass-through URL) to an image URL.
Turns the tool's ``asset_name`` argument into a concrete image URL to
animate. A value that already looks like an ``http(s)`` URL is returned
as-is; otherwise it is treated as the name of a saved game asset and looked
up against the active game session for this channel
(``game_session.get_session`` plus ``game_assets.get_asset_by_name``, which
reads from Redis). If the game modules are unavailable or no session/asset
matches, it returns ``None``.
Called only by ``run`` in this module; there are no external callers.
Args:
asset_name: Either a direct image URL or the name of a saved game asset.
channel_id: Channel used to locate the active game session.
redis: Async Redis client used by the asset lookup; when absent, no
asset resolution is attempted.
Returns:
The resolved image URL, or ``None`` if it could not be determined.
"""
# Direct URL
if asset_name.startswith("http://") or asset_name.startswith("https://"):
return asset_name
# Look up from game assets # 💀
try:
from game_session import get_session
from game_assets import get_asset_by_name
session = get_session(channel_id)
if session and session.active and redis:
asset = await get_asset_by_name(
session.game_id,
asset_name,
redis=redis,
)
if asset:
return asset.url
except ImportError:
pass
return None
async def _download_as_base64(url: str) -> str | None:
"""Download an image and return it as a base64 data URI.
Fetches the source image so it can be embedded inline in the Grok Imagine
request body (the API takes the seed frame as a data URI rather than a bare
URL). The download goes through the SSRF-guarded HTTP helpers
(``safe_httpx_client`` plus ``safe_http_request`` from ``tools._safe_http``),
so blocked or internal hosts are rejected. The response content type drives
the data-URI media type, defaulting to ``image/png``.
Called only by ``run`` in this module; there are no external callers.
Args:
url: The image URL to download; whitespace is stripped before fetching.
Returns:
A ``data:<mime>;base64,<...>`` string on success, or ``None`` if the URL
was blocked, the status was non-200, or the download failed.
"""
try:
async with safe_httpx_client(timeout=httpx.Timeout(30.0)) as client:
resp = await safe_http_request(client, "GET", url.strip(), max_redirects=5)
if resp.status_code != 200:
return None
data = resp.content
ct = (resp.headers.get("content-type") or "image/png").split(";")[0].strip()
b64 = base64.b64encode(data).decode("ascii")
return f"data:{ct};base64,{b64}"
except ValueError as exc:
logger.warning("Blocked Grok image URL: %s", exc)
return None
except Exception as exc:
logger.warning("Failed to download image for animation: %s", exc)
return None
async def _convert_format(
data: bytes,
source_ext: str,
target_ext: str,
) -> bytes | None:
"""Transcode animation bytes between formats using ffmpeg.
Bridges the format the Grok Imagine API returns and the ``output_format`` the
user asked for (webp, mp4, or gif). It shells out to ``ffmpeg`` via
``asyncio.create_subprocess_exec`` inside a temporary directory, applying
libwebp settings for webp and an fps/scale filter for gif; mp4 uses ffmpeg's
default codec. If the source and target already match it returns the input
untouched, and a missing ffmpeg or any failure yields ``None`` so the caller
can fall back to sending the original bytes.
Called only by ``run`` in this module; there are no external callers.
Args:
data: The raw animation/video bytes to convert.
source_ext: Extension describing the input bytes (e.g. ``"mp4"``).
target_ext: Desired output extension (``"webp"``, ``"mp4"``, ``"gif"``).
Returns:
The converted bytes, the original ``data`` when no conversion is needed,
or ``None`` if ffmpeg is unavailable or the conversion failed.
"""
if source_ext == target_ext:
return data
try:
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / f"input.{source_ext}"
dst = Path(tmpdir) / f"output.{target_ext}"
src.write_bytes(data)
cmd = ["ffmpeg", "-y", "-i", str(src)]
if target_ext == "webp":
cmd += [
"-vcodec",
"libwebp",
"-lossless",
"0",
"-quality",
"75",
"-loop",
"0",
]
elif target_ext == "gif":
cmd += ["-vf", "fps=15,scale=480:-1:flags=lanczos"]
# mp4 default codec is fine
cmd.append(str(dst))
proc = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.PIPE,
)
_, stderr = await asyncio.wait_for(proc.communicate(), timeout=60)
if proc.returncode != 0:
logger.warning("ffmpeg conversion failed: %s", stderr.decode()[:300])
return None
return dst.read_bytes()
except FileNotFoundError:
logger.warning("ffmpeg not found -- returning original format")
return None
except Exception as exc:
logger.warning("Format conversion failed: %s", exc)
return None
[docs]
async def run(
asset_name: str,
prompt: str = "",
save_as: str = "",
output_format: str = "webp",
ctx: ToolContext | None = None,
) -> str:
"""Animate a still game asset into a short clip via Grok Imagine img2vid.
Entry point for the ``animate_asset`` tool. It resolves the seed image
(``_resolve_image_url``), downloads and base64-encodes it
(``_download_as_base64``), authorizes with an xAI key (``_resolve_api_key``),
then calls xAI's Grok Imagine image-to-video endpoint and polls for
completion. The finished clip is downloaded through the SSRF-guarded HTTP
helpers, optionally transcoded to the requested format (``_convert_format``,
ffmpeg), and delivered to the channel via ``ctx.adapter.send_file`` with a
record appended to ``ctx.sent_files``. When ``save_as`` is given and a game
session is active, the result is also registered as a new game asset
(``game_assets.upload_asset``, persisted via Redis).
Dispatched by the tool runner in ``tools/__init__.py``, which calls this
module's ``run`` (``tool_def.handler(**arguments, ctx=ctx)``) for the
registered ``animate_asset`` tool; there are no direct internal callers.
Args:
asset_name: Name of a saved game asset to animate, or a direct image URL.
prompt: Optional motion prompt guiding the animation; a sensible default
is used when empty.
save_as: Optional name under which to register the result as a new game
asset.
output_format: Desired output container (``"webp"``, ``"mp4"``, or
``"gif"``); invalid values fall back to ``"webp"``.
ctx: Tool execution context; supplies the channel id, Redis client,
config, user id, outbound adapter, and ``sent_files`` list.
Returns:
str: JSON. On success an object with ``success``, the source name, the
animated URL, the sent filename, and ``saved_as``; on failure an
``{"error": ...}`` object describing what went wrong.
"""
output_format = output_format.lower().strip()
if output_format not in ("webp", "mp4", "gif"):
output_format = "webp"
api_key, _using_own_key = await _resolve_api_key(ctx)
if not api_key:
return json.dumps(
{
"error": "No xAI API key available. "
"Provide your own key via: set_user_api_key "
"service=xai api_key=YOUR_KEY",
}
)
if ctx is None:
return json.dumps({"error": "No tool context available."})
channel_id = str(ctx.channel_id)
redis = getattr(ctx, "redis", None)
# Resolve the image URL # 🌀
image_url = await _resolve_image_url(asset_name, channel_id, redis)
if not image_url:
return json.dumps(
{
"error": f"Could not find asset '{asset_name}'. "
"Provide a valid asset name or direct image URL.",
}
)
# Download and encode as base64 for the API # 💀
image_data_uri = await _download_as_base64(image_url)
if not image_data_uri:
return json.dumps(
{
"error": f"Failed to download image from '{image_url}'.",
}
)
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
}
# Step 1: Initiate video generation # 🔥
payload: dict[str, Any] = {
"model": _MODEL,
"prompt": prompt or "Subtle, natural animation with smooth motion",
"image": image_data_uri,
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
_GENERATE_URL,
headers=headers,
json=payload,
timeout=aiohttp.ClientTimeout(total=30),
) as resp:
if resp.status not in (200, 201, 202):
error_text = await resp.text()
return json.dumps(
{
"error": f"Grok Imagine API error ({resp.status}): "
f"{error_text[:500]}",
}
)
init_data = await resp.json()
except Exception as exc:
return json.dumps({"error": f"Animation request failed: {exc}"})
# Check if we got a direct result or need to poll # 🎮
request_id = init_data.get("request_id", "")
video_url = None
# Direct result (some endpoints return immediately)
if "data" in init_data:
items = init_data["data"]
if items and isinstance(items, list):
video_url = items[0].get("url", "")
# Step 2: Poll for completion # 🕷️
if not video_url and request_id:
poll_url = _POLL_URL.format(request_id=request_id)
elapsed = 0.0
while elapsed < _POLL_TIMEOUT:
await asyncio.sleep(_POLL_INTERVAL)
elapsed += _POLL_INTERVAL
try:
async with aiohttp.ClientSession() as session:
async with session.get(
poll_url,
headers=headers,
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status != 200:
continue
poll_data = await resp.json()
status = poll_data.get("status", "")
if status == "completed":
items = poll_data.get("data", [])
if items:
video_url = items[0].get("url", "")
break
elif status == "failed":
error = poll_data.get("error", "Unknown")
return json.dumps(
{
"error": f"Animation failed: {error}",
}
)
# else: still processing, keep polling
except Exception:
continue
if not video_url:
return json.dumps(
{
"error": "Animation timed out or no result returned.",
}
)
# Download and send the result # 💀
try:
safe_video = assert_safe_http_url(str(video_url).strip())
except ValueError as exc:
return json.dumps({"error": f"Result URL blocked: {exc}"})
try:
async with safe_httpx_client(timeout=httpx.Timeout(60.0)) as client:
resp = await safe_http_request(
client, "GET", safe_video, max_redirects=5
)
if resp.status_code != 200:
return json.dumps(
{
"error": f"Failed to download animated result "
f"({resp.status_code}).",
}
)
video_data = resp.content
ct = (resp.headers.get("content-type") or "video/mp4").split(";")[0].strip()
# Detect source format from content type # 🌀
source_ext = "mp4"
if "webp" in ct:
source_ext = "webp"
elif "gif" in ct:
source_ext = "gif"
# Convert to requested format if different # 🔥
final_data = video_data
final_ext = source_ext
if output_format != source_ext:
converted = await _convert_format(
video_data,
source_ext,
output_format,
)
if converted:
final_data = converted
final_ext = output_format
else:
logger.info(
"Conversion to %s failed, sending as %s",
output_format,
source_ext,
)
# Content type mapping
ct_map = {
"webp": "image/webp",
"mp4": "video/mp4",
"gif": "image/gif",
}
final_ct = ct_map.get(final_ext, "video/mp4")
safe_name = (save_as or asset_name)[:30].replace(" ", "_")
fname = f"animated_{safe_name}.{final_ext}"
file_url = await ctx.adapter.send_file(
channel_id,
final_data,
fname,
final_ct,
)
ctx.sent_files.append(
{
"data": final_data,
"filename": fname,
"mimetype": final_ct,
"file_url": file_url or "",
}
)
except Exception as exc:
return json.dumps({"error": f"Failed to send animation: {exc}"})
result_format = final_ext # noqa: F841 -- used in return
# Save as asset if requested # 💾
if save_as:
try:
from game_session import get_session
from game_assets import upload_asset
session_obj = get_session(channel_id)
if session_obj and session_obj.active:
user_id = str(getattr(ctx, "user_id", ""))
await upload_asset(
game_id=session_obj.game_id,
name=save_as,
category="special",
url=video_url,
user_id=user_id,
turn=session_obj.turn_number,
redis=redis,
)
except Exception as exc:
logger.warning("Failed to save animated asset: %s", exc)
result_info: dict = {
"success": True,
"source": asset_name,
"animated_url": video_url,
"filename": fname,
"saved_as": save_as if save_as else None,
}
if file_url:
result_info["file_url"] = file_url
return json.dumps(result_info)