"""Background art generation agent for S.N.E.S. game turns.
Subscribes to ``game:art:request`` Redis pub/sub channel.
When a game turn completes, the main pipeline publishes the
narrative + character image URLs. This agent generates art
via the Gemini image API and posts it directly to the channel
WITHOUT feeding the result back into the conversation context.
This eliminates the 400 INVALID_ARGUMENT errors caused by
large tool results bloating the chat API payload.
# 💀🔥 ART DAEMON -- RUNS OUTSIDE THE CONTEXT WINDOW
"""
from __future__ import annotations
import asyncio
import hashlib
import io
import jsonutil as json
import logging
import time
from typing import Any
logger = logging.getLogger(__name__)
# Redis pub/sub channel for art requests # 🎨💀
ART_REQUEST_CHANNEL = "game:art:request"
# Cooldown: don't generate art more than once per 30s per channel
_ART_COOLDOWN_SECONDS = 30
_last_art_time: dict[str, float] = {}
# ── GM Reference Art Assets ────────────────────────────── # 👑🎨
# Star's appearance changes based on which egregore is active.
_GM_REF_GAMEGIRL = "https://sg.neko.li/assets/egregores/game/stargazer.png"
_GM_REF_DARK_LOOPMOTHER = (
"https://sg.neko.li/assets/egregores/game/dark_loopmother.webp"
)
# Special sync ref when Vivian is playing Dark Loopmother sessions
_GM_REF_VIVIAN_DL_SYNC = (
"https://sg.neko.li/assets/egregores/game/vivian_DARK_LOOPMOTHER_SYNC.png"
)
_VIVIAN_USER_ID = "829047047633764402"
[docs]
class GameArtAgent:
"""Background agent that auto-generates game art after each turn.
Listens on Redis pub/sub for art requests published by the
main pipeline after game turn responses are sent. Generates
images via Gemini and posts them directly to Discord.
This keeps the image generation result OUT of the LLM's
conversation history, preventing context overflow 400 errors.
"""
[docs]
def __init__(
self,
redis_client: Any = None,
platform_adapter: Any = None,
config: Any = None,
) -> None:
"""Initialize the background art agent with its injected dependencies.
Stores the Redis client (used for pub/sub subscriptions and for reading
Dark Loopmother session flags), the platform adapter (used by
:meth:`_send_to_channel` to post the finished PNG to Discord), and the
config object, and sets the running flag and pub/sub handle to their idle
defaults. No I/O happens here; subscriptions begin in :meth:`start`.
Constructed by the agents service bootstrap in ``agents_main.py``.
Args:
redis_client: Async Redis client for pub/sub and session-flag reads.
When ``None``, :meth:`start` becomes a no-op.
platform_adapter: Platform adapter exposing ``send_file`` for delivering
generated art to a channel.
config: Optional configuration object retained for downstream use.
"""
self._redis = redis_client
self._adapter = platform_adapter
self._config = config
self._running = False
self._pubsub: Any = None
[docs]
async def start(self) -> None:
"""Begin listening for art requests and turn-complete events.
Marks the agent running, initializes the ``_pending_turns`` map that
tracks turns awaiting a fallback render, and launches two detached
``asyncio.create_task`` listeners: ``_listen_loop`` (explicit
``game:art:request`` pub/sub) and ``_listen_turn_complete`` (the
``game:turn:complete`` backup path). The call is a no-op if the agent is
already running or no Redis client was injected, so it is safe to call
once during bootstrap. No art is generated here; the listeners drive that.
Called by ``AgentsService`` in ``agents_main.py``, which awaits
``self.game_art_agent.start()`` during background-agent startup.
"""
if self._running or self._redis is None:
return
self._running = True
self._pending_turns: dict[str, dict[str, Any]] = {}
logger.info("GameArtAgent started -- listening on %s", ART_REQUEST_CHANNEL)
asyncio.create_task(self._listen_loop())
asyncio.create_task(self._listen_turn_complete())
[docs]
async def stop(self) -> None:
"""Stop the agent and tear down its art-request subscription.
Clears the running flag so the listener loops in :meth:`_listen_loop` and
:meth:`_listen_turn_complete` exit on their next iteration, then attempts
to unsubscribe the primary pub/sub handle from ``game:art:request``.
Unsubscribe errors are swallowed so shutdown is best-effort. Note that the
``game:turn:complete`` fallback pub/sub created locally in
:meth:`_listen_turn_complete` is not explicitly unsubscribed here; it
unwinds when that coroutine observes the cleared flag.
Called during agents-service shutdown in ``agents_main.py``.
Returns:
None.
"""
self._running = False
if self._pubsub:
try:
await self._pubsub.unsubscribe(ART_REQUEST_CHANNEL)
except Exception:
pass
async def _listen_loop(self) -> None:
"""Subscribe to ``game:art:request`` and dispatch each explicit request.
Opens a Redis pub/sub on ``ART_REQUEST_CHANNEL`` (stored as
``self._pubsub`` so :meth:`stop` can unsubscribe it) and consumes messages
for as long as the agent is running. Each JSON payload is decoded; its
``channel_id`` is popped from ``_pending_turns`` to cancel the matching
fallback render, and the request is forwarded to ``_handle_art_request``.
Per-message decode/handle errors are logged and skipped. On
``asyncio.CancelledError`` it exits; any other connection-level error is
logged and the loop retries after a 5-second sleep, re-subscribing.
Launched as a fire-and-forget task by :meth:`start`; not called elsewhere.
"""
while self._running:
try:
self._pubsub = self._redis.pubsub()
await self._pubsub.subscribe(ART_REQUEST_CHANNEL)
logger.info("GameArtAgent subscribed to %s", ART_REQUEST_CHANNEL)
async for message in self._pubsub.listen():
if not self._running:
break
if message["type"] != "message":
continue
try:
data = json.loads(
message["data"]
if isinstance(message["data"], str)
else message["data"].decode()
)
# Mark this channel as having received an explicit request
ch = data.get("channel_id", "")
if ch:
self._pending_turns.pop(ch, None)
await self._handle_art_request(data)
except Exception as exc:
logger.warning(
"GameArtAgent: failed to handle request: %s",
exc,
)
except asyncio.CancelledError:
break
except Exception as exc:
logger.warning("GameArtAgent listener error: %s -- retrying in 5s", exc)
await asyncio.sleep(5)
async def _listen_turn_complete(self) -> None:
"""Watch ``game:turn:complete`` and arm a fallback render per turn.
Subscribes to a second Redis pub/sub channel, ``game:turn:complete``, on a
locally scoped handle (deliberately not stored on ``self._pubsub``). For
each turn event it records the payload in ``_pending_turns`` keyed by
``channel_id`` and schedules a ``_check_art_fallback`` task that, after a
60-second timeout, generates art only if no explicit
``game:art:request`` arrived in the meantime. This ensures every game turn
gets art even when the pipeline does not publish an explicit request.
``asyncio.CancelledError`` exits the loop; other errors trigger a 5-second
retry.
Launched as a fire-and-forget task by :meth:`start`; not called elsewhere.
"""
_TURN_ART_TIMEOUT = 60.0
while self._running:
try:
_pubsub = self._redis.pubsub()
await _pubsub.subscribe("game:turn:complete")
async for message in _pubsub.listen():
if not self._running:
break
if message["type"] != "message":
continue
try:
data = json.loads(
message["data"]
if isinstance(message["data"], str)
else message["data"].decode()
)
ch = data.get("channel_id", "")
if ch:
self._pending_turns[ch] = data
# Schedule a check after timeout
asyncio.create_task(
self._check_art_fallback(ch, _TURN_ART_TIMEOUT),
)
except Exception:
pass
except asyncio.CancelledError:
break
except Exception:
await asyncio.sleep(5)
async def _check_art_fallback(
self,
channel_id: str,
timeout: float,
) -> None:
"""Render fallback art for a turn if no explicit request superseded it.
Sleeps for ``timeout`` seconds, then pops the channel's entry from
``_pending_turns``: a ``None`` result means an explicit
``game:art:request`` already handled this turn (the entry was removed in
:meth:`_listen_loop`), so it returns. Otherwise it reconstructs character
reference images by restoring the game session via
``game_session.get_or_restore_session`` and resolving each active player's
active character with ``game_characters.get_active_character`` (both read
Redis), then synthesizes a minimal art request and forwards it to
``_handle_art_request``. Reads Redis and may post art as a side effect.
Scheduled as a detached task by :meth:`_listen_turn_complete`; not called
elsewhere.
Args:
channel_id: The channel whose pending turn to check and render.
timeout: Seconds to wait before deciding the explicit request is absent.
"""
await asyncio.sleep(timeout)
turn_data = self._pending_turns.pop(channel_id, None)
if turn_data is None:
return # explicit request already handled it
narrative = turn_data.get("narrative", "")
if not narrative:
return
logger.info(
"GameArtAgent: no explicit art request for %s after %.0fs "
"-- auto-generating from narrative",
channel_id,
timeout,
)
# Build a minimal art request from the turn data,
# but look up character refs from Redis for img2img
_char_urls: list[str] = []
_char_names: list[str] = []
try:
from game_session import get_or_restore_session
from game_characters import get_active_character
_session = await get_or_restore_session(channel_id, self._redis)
if _session and _session.active:
for _uid in _session.get_active_players():
try:
_char = await get_active_character(
_uid,
self._redis,
)
if _char:
_url = _char.get("image_url", "")
_name = _char.get("name", str(_uid))
if _url:
_char_urls.append(_url)
_char_names.append(_name)
except Exception:
pass
except Exception:
pass
await self._handle_art_request(
{
"channel_id": channel_id,
"narrative": narrative,
"character_urls": _char_urls,
"character_names": _char_names,
"game_name": turn_data.get("game_name", ""),
}
)
async def _handle_art_request(self, data: dict[str, Any]) -> None:
"""Validate, rate-limit, enrich, and fulfill one art request.
Pulls channel id, narrative, character ref URLs/names, game name, and
aspect ratio from the payload and returns early if channel or narrative is
missing. Enforces a per-channel cooldown of ``_ART_COOLDOWN_SECONDS`` via
the module-level ``_last_art_time`` map to avoid spamming renders. It then
calls ``_resolve_gm_reference`` (which reads Redis session flags) to
prepend the correct game-master reference image and appends a matching GM
description to the narrative so the generator actually draws her. Finally
it invokes ``_generate_art`` and, on success, ``_send_to_channel`` to post
the result to Discord. Reads Redis, calls the image API, and posts a file.
Called by :meth:`_listen_loop` for explicit requests and by
:meth:`_check_art_fallback` for the timeout-driven fallback path.
Args:
data: The art-request payload (channel_id, narrative, character_urls,
character_names, game_name, aspect_ratio).
"""
channel_id = data.get("channel_id", "")
narrative = data.get("narrative", "")
character_urls = data.get("character_urls", [])
character_names = data.get("character_names", [])
game_name = data.get("game_name", "")
aspect_ratio = data.get("aspect_ratio", "16:9")
if not channel_id or not narrative:
return
# Cooldown check
now = time.monotonic()
last = _last_art_time.get(channel_id, 0)
if now - last < _ART_COOLDOWN_SECONDS:
logger.debug(
"GameArtAgent: cooldown active for %s (%.0fs left)",
channel_id,
_ART_COOLDOWN_SECONDS - (now - last),
)
return
_last_art_time[channel_id] = now
logger.info(
"GameArtAgent: generating art for channel %s (%s) with %d ref images",
channel_id,
game_name,
len(character_urls),
)
# ── Inject GM reference art based on session flags ── # 👑🎨
gm_url, gm_name = await self._resolve_gm_reference(channel_id)
if gm_url:
character_urls = [gm_url] + list(character_urls)
character_names = [gm_name] + list(character_names)
# Append GM description to narrative so the generator
# knows to actually draw her -- without this, the extra
# ref image gets ignored because the text doesn't mention her
if "stargazer" in gm_name.lower():
narrative += (
"\n\nThe GM STARGAZER is also present in the scene: "
"an anime girl with long flowing dark purple-black hair "
"with sparkling cosmic galaxy texture, pink spiral eyes, "
"wearing a sleek dark indigo cosmic armor bodysuit covered "
"in glowing star and constellation patterns, purple heeled "
"boots. Futuristic cosmic aesthetic. "
"Use the attached Stargazer reference image. "
"She is the game master observing or interacting with the scene."
)
elif "vivian" in gm_name.lower():
narrative += (
"\n\nThe DARK LOOPMOTHER-VIVIAN SYNCED ENTITY is present: "
"a cosmic witch with a pink-magenta and purple striped "
"witch hat, purple cosmic starfield body, one glowing pink "
"eye and one red eye, wearing a white coat with golden "
"runic text borders over her cosmic bodysuit, a purple "
"cosmic diaper with yellow moon symbols, holding a golden "
"pipe with a glass orb containing a green crystal and a "
"tiger-striped golden desert eagle handgun, long flowing "
"cosmic purple-teal hair with sparkles. "
"Use the attached sync reference image. "
"She is both the GM and the player simultaneously."
)
else:
narrative += (
"\n\nThe DARK LOOPMOTHER GM is also present in the scene: "
"a witch with a black and white striped pointed witch hat, "
"one glowing red eye visible, dark purple void body made "
"of cosmic starfield and galaxies, wearing a flowing cosmic "
"dress, surrounded by swirling rainbow portal energy. "
"Pixel art style with neon aurora background. "
"Use the attached Dark Loopmother reference image. "
"She is the malevolent game master controlling the narrative."
)
logger.debug(
"GameArtAgent: injected GM ref '%s' for %s",
gm_name,
channel_id,
)
try:
img_bytes = await self._generate_art(
narrative,
character_urls,
character_names,
aspect_ratio,
)
if img_bytes:
await self._send_to_channel(channel_id, img_bytes)
else:
logger.warning("GameArtAgent: no image generated for %s", channel_id)
except Exception as exc:
logger.error("GameArtAgent: art generation failed: %s", exc, exc_info=True)
async def _resolve_gm_reference(
self,
channel_id: str,
) -> tuple[str, str]:
"""Resolve the GM's reference art based on session flags.
Returns (url, name) for the GM character reference to prepend.
Three modes:
- Dark Loopmother + Vivian playing -> vivian_DL_SYNC
- Dark Loopmother (no Vivian) -> dark_loopmother
- Normal GameGirl session -> stargazer
"""
if self._redis is None:
return _GM_REF_GAMEGIRL, "Stargazer (GM)"
is_dark_loopmother = False
# Check Dark Loopmother flag in Redis
try:
dl_flag = await self._redis.get(
f"game:dark_loopmother:{channel_id}",
)
if dl_flag:
is_dark_loopmother = True
except Exception:
pass
# Also check game name if flag isn't set
if not is_dark_loopmother:
try:
from game_session import get_or_restore_session
session = await get_or_restore_session(
channel_id,
self._redis,
)
if session and session.active:
_gn = (session.game_name or "").lower()
_dl_triggers = (
"dark loopmother",
"loopmother",
"corrupted cartridge",
"dark_loopmother",
)
if any(t in _gn for t in _dl_triggers):
is_dark_loopmother = True
except Exception:
pass
if is_dark_loopmother:
# Check if Vivian holds the Witchborne Crown
# Crown = merged entity (Dark Loopmother + Vivian are one)
try:
from game_session import get_or_restore_session
session = await get_or_restore_session(
channel_id,
self._redis,
)
if session and session.active:
if session.crown_holder == _VIVIAN_USER_ID:
return (
_GM_REF_VIVIAN_DL_SYNC,
"Dark Loopmother-Vivian (synced entity)",
)
except Exception:
pass
return _GM_REF_DARK_LOOPMOTHER, "Dark Loopmother (GM)"
return _GM_REF_GAMEGIRL, "Stargazer (GM)"
async def _generate_art(
self,
narrative: str,
character_urls: list[str],
character_names: list[str] | None = None,
aspect_ratio: str = "16:9",
) -> bytes | None:
"""Generate art using Gemini image API.
Downloads ALL character reference images and sends them as
inlineData parts alongside the narrative prompt.
Uses image-to-image for every character in the scene.
"""
from tools.generate_image import (
_call_gemini_native,
)
# Build character reference list for the prompt # 🎨💀
_char_desc = ""
if character_names:
_ref_lines = []
for i, n in enumerate(character_names):
if i < len(character_urls):
_ref_lines.append(
f"- {n} (ref image #{i+1} attached -- use for img2img)"
)
else:
_ref_lines.append(
f"- {n} (NO ref image -- draw from scene description)"
)
_char_desc = (
"\n\nALL CHARACTERS IN THIS SCENE (draw ALL of them):\n"
+ "\n".join(_ref_lines)
+ "\nEVERY character listed above MUST appear in the final image.\n"
)
# Build the art prompt with MANDATORY safety directive
art_prompt = (
"Generate a high-quality scene illustration for this game moment. "
"Style: vibrant anime/pixel-art hybrid with CRT scanline effects, "
"neon glows, and SNES-era aesthetic. Include HUD overlays with "
"status bars and text boxes where appropriate.\n\n"
"STRICT SAFETY DIRECTIVE: DO NOT GENERATE IMAGES OF LITERAL "
"BABIES OR CHILDREN. ADULT / ABDL CHARACTERS ONLY. Regression "
"is psychological. Avatars can be shrank but MUST NOT be "
"physically regressed to look like actual minors. Art prompts "
"must reflect this. Always draw any regressed characters "
"wearing diapers. All characters must be clearly adult.\n\n"
"USE THE ATTACHED CHARACTER REFERENCE IMAGES. Every character "
"in the scene MUST match their reference. This is img2img -- "
"preserve character appearance, outfit details, and features."
f"{_char_desc}\n"
f"SCENE:\n{narrative[:2000]}"
)
# Build prompt parts: ALL reference images first, then text
prompt_parts: list[dict[str, Any]] = []
# ── Extract URLs Star wrote inline in narrative ── # 🎨💀
# Star often writes character ref URLs directly in [IMAGE_PROMPT].
# Merge them into character_urls so they get downloaded too.
import re
_inline_urls = re.findall(
r"https?://[^\s,\]\)]+\.(?:png|jpg|jpeg|webp|gif)",
narrative,
re.IGNORECASE,
)
if _inline_urls:
_existing = set(character_urls)
for _iu in _inline_urls:
if _iu not in _existing:
character_urls.append(_iu)
_existing.add(_iu)
logger.debug(
"GameArtAgent: extracted inline URL from narrative: %s",
_iu[:80],
)
# Download ALL character reference images for img2img -- NO CAP
if character_urls:
from tools.edit_image import _download_and_encode
dl_tasks = [_download_and_encode(u) for u in character_urls]
dl_results = await asyncio.gather(*dl_tasks, return_exceptions=True)
_loaded = 0
for url, result in zip(character_urls, dl_results):
if isinstance(result, dict):
prompt_parts.append(result)
_loaded += 1
else:
logger.warning(
"GameArtAgent: failed to download ref %s: %s",
url,
result,
)
logger.info(
"GameArtAgent: loaded %d/%d character references for img2img",
_loaded,
len(character_urls),
)
prompt_parts.append({"text": art_prompt})
# Resolve API key -- POOL FIRST, then env, then fallback
api_key = await self._resolve_pool_key()
return await _call_gemini_native(
prompt_parts,
api_key,
aspect_ratio,
)
async def _resolve_pool_key(self) -> str:
"""Pick the Gemini API key, preferring the environment over the fallback.
Returns a stripped ``GEMINI_API_KEY`` from the process environment when one
is set, otherwise falls back to ``FALLBACK_API_KEY`` imported from
``tools.generate_image``. Despite the name it does not currently consult a
shared key pool; the env-then-fallback order keeps render requests working
even when no key is exported. No I/O beyond reading the environment.
Called by ``_generate_art`` immediately before the Gemini image call.
Returns:
The resolved Gemini API key string.
"""
import os
env_key = os.environ.get("GEMINI_API_KEY", "").strip()
if env_key:
return env_key
from tools.generate_image import FALLBACK_API_KEY
return FALLBACK_API_KEY
async def _send_to_channel(
self,
channel_id: str,
img_bytes: bytes,
) -> None:
"""Encode the rendered image as PNG and post it to the channel.
Returns early with a warning if no platform adapter was injected.
Otherwise it opens the raw bytes with Pillow, re-saves them as PNG, derives
a short sha256-based filename, and delivers the file through the platform
adapter's ``send_file`` (the cross-service path that reaches Discord).
Delivery errors are logged rather than raised so a single failed post does
not crash the listener. Touches the filesystem only via in-memory buffers;
the network side effect is the adapter send.
Called by ``_handle_art_request`` after ``_generate_art`` returns bytes.
Args:
channel_id: Target channel to receive the image.
img_bytes: Raw image bytes returned by the Gemini image API.
"""
if self._adapter is None:
logger.warning("GameArtAgent: no platform adapter available")
return
# Convert to PNG # 🔥
from PIL import Image
img = Image.open(io.BytesIO(img_bytes))
buf = io.BytesIO()
img.save(buf, format="PNG")
png_bytes = buf.getvalue()
h = hashlib.sha256(png_bytes).hexdigest()[:12]
fname = f"scene_{h}.png"
try:
await self._adapter.send_file(
channel_id,
png_bytes,
fname,
"image/png",
)
logger.info(
"GameArtAgent: posted scene art to %s (%d bytes)",
channel_id,
len(png_bytes),
)
except Exception as exc:
logger.error("GameArtAgent: failed to send image: %s", exc)
# ── Helper: Publish art request from the main pipeline ────────── # 💀🔥
[docs]
async def publish_art_request(
redis: Any,
channel_id: str,
narrative: str,
character_urls: list[str] | None = None,
character_names: list[str] | None = None,
game_name: str = "",
aspect_ratio: str = "16:9",
) -> None:
"""Publish an art generation request to the background agent.
Call this from the main pipeline after a game turn response
is sent. The background GameArtAgent will pick it up and
generate art WITHOUT touching the conversation context.
"""
if redis is None:
return
payload = json.dumps(
{
"channel_id": channel_id,
"narrative": narrative[:3000],
"character_urls": character_urls or [],
"character_names": character_names or [],
"game_name": game_name,
"aspect_ratio": aspect_ratio,
"ts": time.time(),
}
)
try:
await redis.publish(ART_REQUEST_CHANNEL, payload)
logger.debug(
"Published art request for channel %s (%d chars narrative)",
channel_id,
len(narrative),
)
except Exception as exc:
logger.warning("Failed to publish art request: %s", exc)