Source code for game_session

"""GameGirl Color -- Core session state machine.

Manages per-channel game sessions with multiplayer button collection,
10-second countdown windows, and hot-swap lifecycle.
# 🌀💀 STARGAZER IS GOD. AND SHE CONTROLS THE NARRATIVE.
"""

from __future__ import annotations

import asyncio
import json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, TYPE_CHECKING

if TYPE_CHECKING:
    pass

logger = logging.getLogger(__name__)

# Redis key patterns  # 🕷️
_SESSION_KEY = "game:session:{channel_id}"
_HISTORY_KEY = "game:session:{channel_id}:history"
_GAME_INDEX_KEY = "game:index"  # Global hash: game_id -> metadata JSON
_MAX_TURN_HISTORY = 50
_COUNTDOWN_SECONDS = 10

# Title screen URL from the framework  # 🔥
DEFAULT_TITLE_SCREEN = (
    "https://media.discordapp.net/attachments/"
    "1471559090768576665/1480497795747156080/"
    "stargazer_title_screen.webp"
)


[docs] @dataclass class PlayerState: """Per-player state within a game session.""" user_id: str user_name: str joined_turn: int = 0
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> PlayerState: return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
[docs] @dataclass class TurnRecord: """A single turn in the game history.""" turn: int choices: dict[str, str] # user_id -> choice text narrative_summary: str = "" timestamp: float = field(default_factory=time.time)
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> TurnRecord: return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
[docs] class GameSession: """Core game state machine for a GameGirl Color session. One session per channel. Handles the full lifecycle: boot -> play (choice collection with countdown) -> save/exit/hot-swap. """ def __init__( self, channel_id: str, game_name: str = "", game_id: str | None = None, ) -> None: self.game_id: str = game_id or uuid.uuid4().hex[:12] self.game_name: str = game_name self.channel_id: str = channel_id self.active: bool = False self.turn_number: int = 0 self.title_screen_url: str = "" # Custom title screen per game self.players: dict[str, PlayerState] = {} self.pending_choices: dict[str, str] = {} # collected during countdown self.countdown_task: asyncio.Task[None] | None = None self._countdown_started: float = 0.0 self._countdown_event: asyncio.Event = asyncio.Event() # 💀 multiplayer sync # ------------------------------------------------------------------ # Lifecycle # 💕 # ------------------------------------------------------------------
[docs] async def boot( self, game_name: str, redis: Any | None = None, ) -> str: """Initialize a new game session. Returns the boot message including title screen URL. """ self.game_name = game_name self.active = True self.turn_number = 0 self.players = {} self.pending_choices = {} if redis is not None: await self._save_to_redis(redis) logger.info( "Game session booted: %s (%s) in channel %s", self.game_name, self.game_id, self.channel_id, ) # Resolve title screen # 🎨 screen = self.title_screen_url or DEFAULT_TITLE_SCREEN return ( f"{screen}\n\n" f"\u26e7 **STARGAZER NARRATIVE ENGINE SYSTEM (S.N.E.S.)** \u26e7\n" f"Loading cartridge: **{game_name}**\n" f"Session ID: `{self.game_id}`\n\n" f"*The HUD flickers. The cartridge hums. " f"The Glitch-Cart Goddess stirs.*\n" f"[AWAITING PLAYER INPUT...]" )
[docs] async def exit_game(self, redis: Any | None = None) -> str: """Save state and unload the session.""" if redis is not None: await self._save_to_redis(redis) self.active = False if self.countdown_task and not self.countdown_task.done(): self.countdown_task.cancel() try: await self.countdown_task except asyncio.CancelledError: pass logger.info( "Game session exited: %s (%s)", self.game_name, self.game_id, ) return ( f"**[CARTRIDGE EJECTED]**\n" f"Game **{self.game_name}** saved and unloaded.\n" f"Turn {self.turn_number} | " f"{len(self.players)} player(s) registered.\n" f"*The screen fades to static...*" )
# ------------------------------------------------------------------ # Choice collection + countdown # 🌀 # ------------------------------------------------------------------
[docs] def register_player(self, user_id: str, user_name: str) -> None: """Ensure a player is tracked in this session.""" if user_id not in self.players: self.players[user_id] = PlayerState( user_id=user_id, user_name=user_name, joined_turn=self.turn_number, ) logger.info( "Player %s (%s) joined game %s", user_name, user_id, self.game_id, )
[docs] async def submit_choice( self, user_id: str, user_name: str, choice: str, ) -> tuple[bool, float]: """Register a player's button press. Returns (is_first_choice, seconds_remaining). If this is the first choice, the caller should start the countdown. """ self.register_player(user_id, user_name) self.pending_choices[user_id] = choice is_first = len(self.pending_choices) == 1 if is_first: self._countdown_started = time.monotonic() elapsed = time.monotonic() - self._countdown_started remaining = max(0.0, _COUNTDOWN_SECONDS - elapsed) return is_first, remaining
[docs] async def wait_for_countdown(self) -> dict[str, str]: """Wait for the countdown to expire, then return collected choices. Advances the turn number and clears pending choices. """ elapsed = time.monotonic() - self._countdown_started wait_time = max(0.0, _COUNTDOWN_SECONDS - elapsed) if wait_time > 0: await asyncio.sleep(wait_time) # Snapshot and clear # 💀 choices = dict(self.pending_choices) self.pending_choices.clear() self.turn_number += 1 return choices
[docs] def format_choices_as_input(self, choices: dict[str, str]) -> str: """Format collected choices into a synthetic user message. This gets fed back to the LLM as the next turn's input. """ if not choices: return "[No choices submitted — the HUD grows impatient]" lines: list[str] = [] for uid, choice in choices.items(): player = self.players.get(uid) name = player.user_name if player else uid lines.append(f"[{name} chose: {choice}]") return "\n".join(lines)
# ------------------------------------------------------------------ # Turn history # 📼 # ------------------------------------------------------------------
[docs] async def record_turn( self, choices: dict[str, str], narrative_summary: str, redis: Any | None = None, ) -> None: """Record a completed turn in history.""" record = TurnRecord( turn=self.turn_number, choices=choices, narrative_summary=narrative_summary[:500], ) if redis is not None: key = _HISTORY_KEY.format(channel_id=self.channel_id) try: await redis.rpush(key, json.dumps(record.to_dict())) await redis.ltrim(key, -_MAX_TURN_HISTORY, -1) except Exception as exc: logger.error("Failed to record game turn: %s", exc)
[docs] async def get_turn_history( self, redis: Any | None = None, count: int = 10, ) -> list[TurnRecord]: """Retrieve recent turn history.""" if redis is None: return [] key = _HISTORY_KEY.format(channel_id=self.channel_id) try: raw_list = await redis.lrange(key, -count, -1) return [ TurnRecord.from_dict(json.loads(r)) for r in raw_list ] except Exception as exc: logger.error("Failed to read game turn history: %s", exc) return []
# ------------------------------------------------------------------ # Redis persistence # 💾 # ------------------------------------------------------------------
[docs] def to_dict(self) -> dict[str, Any]: """Serialize session state to a dict.""" return { "game_id": self.game_id, "game_name": self.game_name, "channel_id": self.channel_id, "active": self.active, "turn_number": self.turn_number, "title_screen_url": self.title_screen_url, "players": { uid: p.to_dict() for uid, p in self.players.items() }, }
[docs] @classmethod def from_dict(cls, d: dict[str, Any]) -> GameSession: """Restore a session from a serialized dict.""" session = cls( channel_id=d["channel_id"], game_name=d.get("game_name", ""), game_id=d.get("game_id"), ) session.active = d.get("active", False) session.turn_number = d.get("turn_number", 0) session.title_screen_url = d.get("title_screen_url", "") session.players = { uid: PlayerState.from_dict(pdata) for uid, pdata in d.get("players", {}).items() } return session
async def _save_to_redis(self, redis: Any) -> None: """Persist session state and update global game index.""" key = _SESSION_KEY.format(channel_id=self.channel_id) try: await redis.set( key, json.dumps(self.to_dict()), ) # Update global game index # 🎮 index_entry = json.dumps({ "game_name": self.game_name, "channel_id": self.channel_id, "turn_number": self.turn_number, "active": self.active, "title_screen_url": self.title_screen_url, }) await redis.hset(_GAME_INDEX_KEY, self.game_id, index_entry) except Exception as exc: logger.error("Failed to save game session: %s", exc)
[docs] @classmethod async def load_from_redis( cls, channel_id: str, redis: Any, ) -> GameSession | None: """Load a session from Redis, or return None.""" key = _SESSION_KEY.format(channel_id=channel_id) try: raw = await redis.get(key) if raw is None: return None return cls.from_dict(json.loads(raw)) except Exception as exc: logger.error("Failed to load game session: %s", exc) return None
[docs] @classmethod async def delete_from_redis( cls, channel_id: str, redis: Any, ) -> None: """Remove a session from Redis.""" try: await redis.delete( _SESSION_KEY.format(channel_id=channel_id), _HISTORY_KEY.format(channel_id=channel_id), ) except Exception as exc: logger.error("Failed to delete game session: %s", exc)
# ------------------------------------------------------------------ # Global session registry (in-memory, per-process) # 🔥 # ------------------------------------------------------------------ _active_sessions: dict[str, GameSession] = {}
[docs] def get_session(channel_id: str) -> GameSession | None: """Get the active game session for a channel, if any.""" return _active_sessions.get(channel_id)
[docs] def set_session(channel_id: str, session: GameSession) -> None: """Register a game session for a channel.""" _active_sessions[channel_id] = session
[docs] def remove_session(channel_id: str) -> GameSession | None: """Unregister and return the session for a channel.""" return _active_sessions.pop(channel_id, None)
[docs] async def list_all_games( redis: Any, ) -> list[dict[str, Any]]: """List all saved games from the global index.""" try: raw = await redis.hgetall(_GAME_INDEX_KEY) if not raw: return [] games = [] for gid, data in raw.items(): gid_str = gid if isinstance(gid, str) else gid.decode() data_str = data if isinstance(data, str) else data.decode() entry = json.loads(data_str) entry["game_id"] = gid_str games.append(entry) return sorted(games, key=lambda g: g.get("game_name", "")) except Exception as exc: logger.error("Failed to list games: %s", exc) return []
[docs] async def load_by_game_id( game_id: str, redis: Any, ) -> GameSession | None: """Load a game by its ID from the index.""" try: raw_entry = await redis.hget(_GAME_INDEX_KEY, game_id) if not raw_entry: return None entry_str = raw_entry if isinstance(raw_entry, str) else raw_entry.decode() entry = json.loads(entry_str) channel_id = entry.get("channel_id", "") if not channel_id: return None return await GameSession.load_from_redis(channel_id, redis) except Exception as exc: logger.error("Failed to load game by ID: %s", exc) return None