"""GameGirl Color -- Core session state machine.
Manages per-channel game sessions with multiplayer button collection,
10-second countdown windows, and hot-swap lifecycle.
# 🌀💀 STARGAZER IS GOD. AND SHE CONTROLS THE NARRATIVE.
"""
from __future__ import annotations
import asyncio
import enum
import jsonutil as json
import logging
import time
import uuid
from dataclasses import dataclass, field, asdict
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
# Redis key patterns # 🕷️
_SESSION_KEY = "game:session:{channel_id}"
_HISTORY_KEY = "game:session:{channel_id}:history"
_GAME_INDEX_KEY = "game:index" # Global hash: game_id -> metadata JSON
_MAX_TURN_HISTORY = 50
_COUNTDOWN_SECONDS = 10
# Title screen URL from the framework # 🔥
DEFAULT_TITLE_SCREEN = (
"https://media.discordapp.net/attachments/"
"1471559090768576665/1480497795747156080/"
"stargazer_title_screen.webp"
)
[docs]
class ActivityTier(enum.Enum):
"""Player activity classification for roster injection and pacing.
Buckets each player into ``ACTIVE``, ``IDLE``, or ``DORMANT`` based on how
recently they pressed a button, measured both in turns and wall-clock time.
The tier drives whether a player counts toward choice requirements and art
references and how the roster is summarized into Star's context: only the
enum string values are persisted, and classification itself is computed by
:meth:`GameSession._classify_tier`. Consumed by
:meth:`GameSession.get_all_players_tiered` and
:meth:`GameSession.get_active_players`, by the context-injection layer in
``message_processor/context_injections.py``, and by the background game-turn
agent in ``background_agents/game_turn_agent.py`` (which filters on
``ACTIVE``/``IDLE`` versus ``DORMANT``).
"""
ACTIVE = "active" # interacted within last 2 turns or 15 min
IDLE = "idle" # interacted within last 5 turns or 1 hour
DORMANT = "dormant" # no interaction > 1 hour AND > 5 turns
[docs]
@dataclass
class PlayerState:
"""Per-player state tracked inside a single game session.
Captures everything :class:`GameSession` needs to know about one
participant: identity (``user_id``/``user_name``), the turn they joined, and
the activity-tracking fields (``last_active`` timestamp, ``last_active_turn``,
and ``consecutive_skips``) that :meth:`GameSession._classify_tier` reads to
assign an :class:`ActivityTier`. Instances are mutated in place when a player
presses a button and round-tripped through Redis via :meth:`to_dict` /
:meth:`from_dict`. Created and held in ``GameSession.players``; also imported
directly by the Redis persistence tests in
``tests/core/test_game_session_redis.py``.
"""
user_id: str
user_name: str
joined_turn: int = 0
last_active: float = field(default_factory=time.time)
last_active_turn: int = 0
consecutive_skips: int = 0
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize this player state into a plain dictionary.
Uses :func:`dataclasses.asdict` to flatten every field into a
JSON-friendly mapping. Called by :meth:`GameSession.to_dict` and
:meth:`GameSession.to_redis` (via the ``hasattr(p, "to_dict")``
branches) when persisting the session's player roster to Redis.
Returns:
dict[str, Any]: All fields of this ``PlayerState`` keyed by name.
"""
return asdict(self)
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> PlayerState:
"""Reconstruct a ``PlayerState`` from a serialized dictionary.
Filters ``d`` to only recognized dataclass fields so that extra or
legacy keys are silently dropped, keeping deserialization forward- and
backward-compatible. Called throughout :class:`GameSession` to lazily
upgrade dict-shaped player entries back into objects -- e.g. in
:meth:`register_player`, :meth:`get_all_players_tiered`,
:meth:`get_active_players`, :meth:`record_turn`, and the
:meth:`GameSession.from_dict` / :meth:`GameSession.from_redis`
restore paths.
Args:
d: A mapping of field names to values, typically loaded from JSON
stored in Redis. Unknown keys are ignored.
Returns:
PlayerState: A new instance populated from the recognized keys.
"""
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
[docs]
@dataclass
class TurnRecord:
"""An immutable snapshot of one completed turn in the game history.
Records the turn number, the per-player ``choices`` collected that round
(``user_id`` -> choice text), an optional truncated ``narrative_summary``,
and a creation ``timestamp``. Instances are built by
:meth:`GameSession.record_turn`, serialized with :meth:`to_dict`, and pushed
onto the Redis history list ``game:session:{channel_id}:history``;
:meth:`GameSession.get_turn_history` rehydrates them via :meth:`from_dict`.
Distinct from the unrelated ``TurnRecord`` defined in
``user_limbic_mirror.py``.
"""
turn: int
choices: dict[str, str] # user_id -> choice text
narrative_summary: str = ""
timestamp: float = field(default_factory=time.time)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize this turn record into a plain dictionary.
Uses :func:`dataclasses.asdict` to produce a JSON-serializable
mapping. Called by :meth:`GameSession.record_turn`, where the result
is JSON-encoded and pushed onto the Redis history list
``game:session:{channel_id}:history``.
Returns:
dict[str, Any]: All fields of this ``TurnRecord`` keyed by name.
"""
return asdict(self)
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> TurnRecord:
"""Reconstruct a ``TurnRecord`` from a serialized dictionary.
Filters ``d`` down to known dataclass fields so unexpected keys from
older history entries are ignored. Called by
:meth:`GameSession.get_turn_history` when decoding JSON entries read
back from the Redis history list.
Args:
d: A mapping of field names to values, typically parsed from a
JSON string stored in Redis. Unknown keys are ignored.
Returns:
TurnRecord: A new instance populated from the recognized keys.
"""
return cls(**{k: v for k, v in d.items() if k in cls.__dataclass_fields__})
[docs]
class GameSession:
"""Core game state machine for a GameGirl Color session.
One session per channel. Handles the full lifecycle:
boot -> play (choice collection with countdown) -> save/exit/hot-swap.
"""
[docs]
def __init__(
self,
channel_id: str,
game_name: str = "",
game_id: str | None = None,
) -> None:
"""Create an empty, inactive game session for a channel.
Initializes all per-channel state: the player roster, pending choice
buffer, countdown synchronization primitives (an :class:`asyncio.Event`
and the monotonic start timestamp used by :meth:`submit_choice` /
:meth:`wait_for_countdown`), and the Witchborne Crown holder (``None``
means Stargazer/the Dark Loopmother is sole GM). A new random 12-hex
``game_id`` is minted when one is not supplied. No Redis or network I/O
happens here; persistence is deferred to :meth:`boot` /
:meth:`_save_to_redis`.
Constructed directly by the ``game_controls`` and ``hot_swap_game``
tools (``GameSession(channel_id=...)``) and by the Discord platform
adapters, and indirectly via :meth:`from_dict` / :meth:`from_redis`
when restoring a saved session.
Args:
channel_id: Discord channel ID this session is bound to; used as
the key suffix for all Redis persistence.
game_name: Human-readable cartridge/game title. Defaults to ``""``
and is typically set later by :meth:`boot`.
game_id: Optional stable game identifier. When ``None``, a fresh
12-character hex id is generated.
"""
self.game_id: str = game_id or uuid.uuid4().hex[:12]
self.game_name: str = game_name
self.channel_id: str = channel_id
self.active: bool = False
self.turn_number: int = 0
self.title_screen_url: str = "" # Custom title screen per game
self.players: dict[str, PlayerState] = {}
self.pending_choices: dict[str, str] = {} # collected during countdown
self.countdown_task: asyncio.Task[None] | None = None
self._countdown_started: float = 0.0
self._countdown_event: asyncio.Event = asyncio.Event() # 💀 multiplayer sync
# Witchborne Crown -- PvP co-GM alignment state # 👑🔥
# None = Stargazer/Dark Loopmother holds the Crown (sole GM)
# user_id string = that player is co-GM
self.crown_holder: str | None = None
# ------------------------------------------------------------------
# Lifecycle # 💕
# ------------------------------------------------------------------
[docs]
async def boot(
self,
game_name: str,
redis: Any | None = None,
) -> str:
"""Activate a fresh game session and emit its title-screen boot banner.
Resets the runtime state for a new cartridge: marks the session active,
zeroes the turn counter, and clears the player roster and pending-choice
buffer so a previously-used session object starts clean. When a ``redis``
client is supplied it persists the new state immediately via
:meth:`_save_to_redis` (which also updates the global game index and the
``sg:game_session`` hash), then logs the boot and returns a formatted
banner using ``title_screen_url`` or :data:`DEFAULT_TITLE_SCREEN`.
Called by the ``game_controls`` and ``hot_swap_game`` tools when a player
loads a cartridge.
Args:
game_name: Human-readable cartridge title; stored on the session and
echoed in the returned banner.
redis: Optional async Redis client. When provided, the session is
saved before the banner is returned; when ``None`` no I/O occurs.
Returns:
str: The multi-line boot message, including the title-screen URL and
the session id, ready to send back to the channel.
"""
self.game_name = game_name
self.active = True
self.turn_number = 0
self.players = {}
self.pending_choices = {}
if redis is not None:
await self._save_to_redis(redis)
logger.info(
"Game session booted: %s (%s) in channel %s",
self.game_name,
self.game_id,
self.channel_id,
)
# Resolve title screen # 🎨
screen = self.title_screen_url or DEFAULT_TITLE_SCREEN
return (
f"{screen}\n\n"
f"\u26e7 **STARGAZER NARRATIVE ENGINE SYSTEM (S.N.E.S.)** \u26e7\n"
f"Loading cartridge: **{game_name}**\n"
f"Session ID: `{self.game_id}`\n\n"
f"*The HUD flickers. The cartridge hums. "
f"The Glitch-Cart Goddess stirs.*\n"
f"[AWAITING PLAYER INPUT...]"
)
[docs]
async def exit_game(self, redis: Any | None = None) -> str:
"""Persist final state, deactivate the session, and eject the cartridge.
Performs an orderly shutdown of an active game: flushes current state to
Redis via :meth:`_save_to_redis` (when a client is given) so the session
can be reloaded later, flips ``active`` to ``False``, and cancels any
in-flight countdown task -- awaiting it and swallowing the resulting
:class:`asyncio.CancelledError` so cancellation is clean. Returns a
themed "cartridge ejected" message summarizing the saved turn count and
player roster.
Called by the ``exit_game`` tool, the message processor's game cleanup
path, and the game UI components when a session is closed.
Args:
redis: Optional async Redis client. When provided, state is saved
before the session is torn down; when ``None`` no I/O occurs.
Returns:
str: A formatted shutdown banner reporting the game name, final turn
number, and number of registered players.
"""
if redis is not None:
await self._save_to_redis(redis)
self.active = False
if self.countdown_task and not self.countdown_task.done():
self.countdown_task.cancel()
try:
await self.countdown_task
except asyncio.CancelledError:
pass
logger.info(
"Game session exited: %s (%s)",
self.game_name,
self.game_id,
)
return (
f"**[CARTRIDGE EJECTED]**\n"
f"Game **{self.game_name}** saved and unloaded.\n"
f"Turn {self.turn_number} | "
f"{len(self.players)} player(s) registered.\n"
f"*The screen fades to static...*"
)
# ------------------------------------------------------------------
# Choice collection + countdown # 🌀
# ------------------------------------------------------------------
[docs]
def register_player(self, user_id: str, user_name: str) -> None:
"""Ensure a player is in the roster, refreshing their activity stamps.
Idempotently registers a participant: if ``user_id`` is unknown a new
:class:`PlayerState` is created (recording the join turn and current
time) and the join is logged; if the player already exists their
``last_active`` timestamp and ``last_active_turn`` are bumped to "now" so
activity-tier classification stays current. Tolerates a dict-shaped entry
left over from a Redis restore by upgrading it back to a
:class:`PlayerState` in place. Mutates ``self.players`` only; no Redis
I/O.
Called by :meth:`submit_choice` on every button press, and directly by
the Discord platform adapter, the ``game_controls`` and ``hot_swap_game``
tools, and the Redis persistence tests.
Args:
user_id: Stable identifier of the player to track.
user_name: Display name, stored on first registration.
"""
if user_id not in self.players:
self.players[user_id] = PlayerState(
user_id=user_id,
user_name=user_name,
joined_turn=self.turn_number,
last_active=time.time(),
last_active_turn=self.turn_number,
)
logger.info(
"Player %s (%s) joined game %s",
user_name,
user_id,
self.game_id,
)
else:
p = self.players[user_id]
if isinstance(p, dict):
p = PlayerState.from_dict(p)
self.players[user_id] = p
p.last_active = time.time()
p.last_active_turn = self.turn_number
def _classify_tier(self, ps: PlayerState) -> ActivityTier:
"""Bucket one player into an :class:`ActivityTier` from their staleness.
Compares the player's ``last_active_turn`` and ``last_active`` against the
session's current turn and wall-clock time: a player active within 2
turns or 15 minutes is ``ACTIVE``, within 5 turns or 1 hour is ``IDLE``,
and otherwise ``DORMANT``. Reads the fields defensively via
:func:`getattr` so partially-restored states still classify. Pure
computation -- no mutation or I/O.
Called by :meth:`get_all_players_tiered` and :meth:`get_active_players`.
Args:
ps: The :class:`PlayerState` to classify.
Returns:
ActivityTier: The player's current activity bucket.
"""
now = time.time()
turn = self.turn_number
turn_delta = turn - getattr(ps, "last_active_turn", 0)
time_delta = now - getattr(ps, "last_active", 0)
if turn_delta <= 2 or time_delta <= 900:
return ActivityTier.ACTIVE
if turn_delta <= 5 or time_delta <= 3600:
return ActivityTier.IDLE
return ActivityTier.DORMANT
[docs]
def get_all_players_tiered(
self,
) -> list[tuple[PlayerState, ActivityTier]]:
"""Return every roster player paired with their current activity tier.
Walks the full ``self.players`` roster -- players are never pruned, so
this includes dormant ones -- classifying each via
:meth:`_classify_tier`. As a side effect it lazily upgrades any
dict-shaped entry (left over from a Redis restore) back into a
:class:`PlayerState` in place. The tiers are informational, used to
render the roster summary injected into Star's context.
Called by the context-injection layer in
``message_processor/context_injections.py`` and by the background
game-turn agent in ``background_agents/game_turn_agent.py``.
Returns:
list[tuple[PlayerState, ActivityTier]]: One ``(player, tier)`` pair
per registered player, in roster (insertion) order.
"""
result: list[tuple[PlayerState, ActivityTier]] = []
for uid, ps in self.players.items():
if isinstance(ps, dict):
ps = PlayerState.from_dict(ps)
self.players[uid] = ps
tier = self._classify_tier(ps)
result.append((ps, tier))
return result
[docs]
def get_active_players(
self,
inactive_hours: float = 2.0,
) -> dict[str, PlayerState]:
"""Return only the engaged (``ACTIVE`` or ``IDLE``) players.
Filters the roster down to participants the game should still treat as
present, classifying each via :meth:`_classify_tier` and dropping anyone
currently ``DORMANT`` so they are excluded from art references and choice
requirements. Like :meth:`get_all_players_tiered`, it lazily upgrades
dict-shaped restored entries into :class:`PlayerState` objects in place.
Called by the message processor's generate/send path, the
context-injection layer, the background game-art agent, and the
OpenRouter executor's game flow.
Args:
inactive_hours: Retained for API compatibility; the actual
active/idle/dormant thresholds live in :meth:`_classify_tier`
and this value is not consulted here.
Returns:
dict[str, PlayerState]: The ``user_id`` -> :class:`PlayerState`
mapping of non-dormant players.
"""
active: dict[str, PlayerState] = {}
for uid, ps in self.players.items():
if isinstance(ps, dict):
ps = PlayerState.from_dict(ps)
self.players[uid] = ps
tier = self._classify_tier(ps)
if tier in (ActivityTier.ACTIVE, ActivityTier.IDLE):
active[uid] = ps
return active
# ------------------------------------------------------------------
# Witchborne Crown -- PvP co-GM system # \U0001f451\U0001f525
# ------------------------------------------------------------------
[docs]
def set_crown(self, user_id: str | None) -> None:
"""Transfer (or revoke) the Witchborne Crown co-GM alignment.
Updates ``self.crown_holder`` in place: a ``user_id`` promotes that
player to co-GM, while ``None`` returns the Crown to Stargazer / the Dark
Loopmother as sole GM. Resolves the new holder's display name via
:meth:`get_crown_holder_name` purely for the log line; performs no Redis
I/O of its own (persistence happens later through the normal save path).
Called by the ``set_witchborne_crown`` tool and the Discord platform
adapter's crown-handling flow.
Args:
user_id: The player to crown as co-GM, or ``None`` to hand the Crown
back to Stargazer.
"""
self.crown_holder = user_id
if user_id:
holder_name = self.get_crown_holder_name()
logger.info(
"Witchborne Crown transferred to %s (%s) in game %s",
holder_name,
user_id,
self.game_id,
)
else:
logger.info(
"Witchborne Crown returned to Stargazer in game %s",
self.game_id,
)
[docs]
def get_crown_holder_name(self) -> str:
"""Return the display name of whoever currently holds the Crown.
Resolves ``self.crown_holder`` to a human-readable name: ``"Stargazer"``
when no player holds it, the player's ``user_name`` when found in the
roster (tolerating either a :class:`PlayerState` object or a dict-shaped
restored entry), and ``"Unknown"`` as a fallback. Read-only.
Called by :meth:`set_crown` for its log message, by the context-injection
layer, the Discord platform adapter, and the ``set_witchborne_crown``
tool.
Returns:
str: The current Crown holder's display name.
"""
if self.crown_holder is None:
return "Stargazer"
p = self.players.get(self.crown_holder)
if p and hasattr(p, "user_name"):
return p.user_name
if isinstance(p, dict):
return p.get("user_name", "Unknown")
return "Unknown"
[docs]
def is_crown_holder(self, user_id: str) -> bool:
"""Report whether a specific user currently holds the Crown.
Returns ``False`` whenever the Crown rests with Stargazer
(``crown_holder is None``), since no player holds it then; otherwise
compares ``user_id`` against the stored holder. Read-only.
Called by the Discord platform adapter to gate crown-only choices.
Args:
user_id: The player to test.
Returns:
bool: ``True`` only if this player is the current co-GM Crown holder.
"""
if self.crown_holder is None:
return False # Stargazer holds it, no player does
return self.crown_holder == user_id
[docs]
async def submit_choice(
self,
user_id: str,
user_name: str,
choice: str,
) -> tuple[bool, float]:
"""Record a player's button press into the pending-choice buffer.
Ensures the player is on the roster via :meth:`register_player`, refreshes
their activity stamps and clears their ``consecutive_skips``, then stores
the choice in ``self.pending_choices`` (keyed by ``user_id``, so a player
may overwrite an earlier choice within the same window). When this is the
first choice of a new turn it stamps ``_countdown_started`` with a
monotonic timestamp; the returned ``is_first`` flag signals the caller to
kick off the countdown loop. All in-memory; persistence happens later via
:meth:`record_turn`.
Called by the Discord platform adapter's button-handling flow.
Args:
user_id: Identifier of the player pressing a button.
user_name: Display name, used to register the player if new.
choice: The chosen option text to buffer for this turn.
Returns:
tuple[bool, float]: ``(is_first_choice, seconds_remaining)`` where
``is_first_choice`` is ``True`` only for the choice that opened the
window, and ``seconds_remaining`` is the time left before the
ten-second countdown closes.
"""
self.register_player(user_id, user_name)
# Reset activity tracking on choice submission
if user_id in self.players:
p = self.players[user_id]
p.last_active = time.time()
p.last_active_turn = self.turn_number
p.consecutive_skips = 0
self.pending_choices[user_id] = choice
is_first = len(self.pending_choices) == 1
if is_first:
self._countdown_started = time.monotonic()
elapsed = time.monotonic() - self._countdown_started
remaining = max(0.0, _COUNTDOWN_SECONDS - elapsed)
return is_first, remaining
[docs]
async def wait_for_countdown(self) -> dict[str, str]:
"""Sleep out the remaining countdown, then harvest the buffered choices.
Computes how much of the ten-second window is left (from
``_countdown_started``) and, if any, awaits :func:`asyncio.sleep` so all
players have a chance to press a button. It then snapshots
``self.pending_choices``, clears the buffer, and advances
``self.turn_number`` so the next window starts fresh. The returned
snapshot becomes the raw material for :meth:`format_choices_as_input` and
:meth:`record_turn`.
Called by the Discord platform adapter after :meth:`submit_choice`
reports the first choice of a turn.
Returns:
dict[str, str]: The ``user_id`` -> choice-text mapping collected
during the window (empty if nobody pressed a button).
"""
elapsed = time.monotonic() - self._countdown_started
wait_time = max(0.0, _COUNTDOWN_SECONDS - elapsed)
if wait_time > 0:
await asyncio.sleep(wait_time)
# Snapshot and clear # 💀
choices = dict(self.pending_choices)
self.pending_choices.clear()
self.turn_number += 1
return choices
# ------------------------------------------------------------------
# Turn history # 📼
# ------------------------------------------------------------------
[docs]
async def record_turn(
self,
choices: dict[str, str],
narrative_summary: str,
redis: Any | None = None,
) -> None:
"""Append a completed turn to history and autosave the session.
Builds a :class:`TurnRecord` (with the narrative summary truncated to 500
characters) and, when a ``redis`` client is given, JSON-encodes it onto
the per-channel history list ``game:session:{channel_id}:history`` via
``RPUSH``, then trims that list to the most recent ``_MAX_TURN_HISTORY``
entries. As a side effect it increments ``consecutive_skips`` for every
rostered player who did not submit a choice this turn (so activity tiers
decay), lazily upgrading dict-shaped restored entries to
:class:`PlayerState` along the way, and finally autosaves the whole
session through :meth:`_save_to_redis`. Redis failures are caught and
logged rather than raised.
Invoked internally as part of the game-turn flow (no static callers
outside this module); the turn loop drives it after each window closes.
Args:
choices: The ``user_id`` -> choice-text mapping for the just-finished
turn; players absent from this set have their skip count bumped.
narrative_summary: The turn's narrative text; stored truncated.
redis: Optional async Redis client. When ``None`` nothing is
persisted and skip counts are still updated in memory.
"""
# Track skips for activity tier binding
_chose = set(choices.keys())
for uid, ps in self.players.items():
if isinstance(ps, dict):
ps = PlayerState.from_dict(ps)
self.players[uid] = ps
if uid not in _chose:
ps.consecutive_skips = getattr(ps, "consecutive_skips", 0) + 1
record = TurnRecord(
turn=self.turn_number,
choices=choices,
narrative_summary=narrative_summary[:500],
)
if redis is not None:
key = _HISTORY_KEY.format(channel_id=self.channel_id)
try:
await redis.rpush(key, json.dumps(record.to_dict()))
await redis.ltrim(key, -_MAX_TURN_HISTORY, -1)
except Exception as exc:
logger.error("Failed to record game turn: %s", exc)
# Autosave session state every turn
try:
await self._save_to_redis(redis)
except Exception as exc:
logger.error("Autosave failed: %s", exc)
[docs]
async def get_turn_history(
self,
redis: Any | None = None,
count: int = 10,
) -> list[TurnRecord]:
"""Read back the most recent turns from the Redis history list.
Fetches the last ``count`` entries of
``game:session:{channel_id}:history`` with ``LRANGE``, decoding each JSON
blob into a :class:`TurnRecord` via :meth:`TurnRecord.from_dict`. Returns
an empty list when no ``redis`` client is supplied, and on any Redis or
decode error it logs and returns an empty list rather than raising.
Invoked as part of the game-turn flow to surface prior turns as context
(no static callers outside this module).
Args:
redis: Optional async Redis client; ``None`` yields an empty list.
count: Maximum number of most-recent turns to return.
Returns:
list[TurnRecord]: The recent turn records oldest-first, or empty on
missing client or error.
"""
if redis is None:
return []
key = _HISTORY_KEY.format(channel_id=self.channel_id)
try:
raw_list = await redis.lrange(key, -count, -1)
return [TurnRecord.from_dict(json.loads(r)) for r in raw_list]
except Exception as exc:
logger.error("Failed to read game turn history: %s", exc)
return []
# ------------------------------------------------------------------
# Redis persistence # 💾
# ------------------------------------------------------------------
[docs]
def to_dict(self) -> dict[str, Any]:
"""Serialize the session into a JSON-friendly dictionary.
Flattens the durable session fields (ids, game name, channel, active
flag, turn number, title-screen URL, and Crown holder) plus the full
player roster -- each :class:`PlayerState` rendered via its own
:meth:`PlayerState.to_dict`, tolerating already-dict entries. This is the
canonical JSON form written under ``game:session:{channel_id}`` by
:meth:`_save_to_redis` and the mirror of :meth:`from_dict`. Transient
countdown state and pending choices are intentionally omitted (the Redis
hash form in :meth:`to_redis` carries those instead). Pure -- no I/O.
Called by :meth:`_save_to_redis`.
Returns:
dict[str, Any]: The serialized session, suitable for ``json.dumps``.
"""
return {
"game_id": self.game_id,
"game_name": self.game_name,
"channel_id": self.channel_id,
"active": self.active,
"turn_number": self.turn_number,
"title_screen_url": self.title_screen_url,
"crown_holder": self.crown_holder, # 👑
"players": {
uid: (p.to_dict() if hasattr(p, "to_dict") else p)
for uid, p in self.players.items()
},
}
[docs]
@classmethod
def from_dict(cls, d: dict[str, Any]) -> GameSession:
"""Reconstruct a :class:`GameSession` from its serialized dict form.
Inverse of :meth:`to_dict`: builds a new session from ``channel_id``,
``game_name``, and ``game_id``, then restores the active flag, turn
number, title-screen URL, and Crown holder, and rehydrates each roster
entry through :meth:`PlayerState.from_dict`. Missing keys fall back to
sensible defaults so older or partial blobs still load. Pure -- no I/O.
Called by :meth:`load_from_redis` and directly by the Discord platform
adapters (``discord.py`` and ``discord_self.py``) when decoding a stored
session.
Args:
d: The serialized session mapping, as produced by :meth:`to_dict`.
Returns:
GameSession: A fully populated session instance.
"""
session = cls(
channel_id=d["channel_id"],
game_name=d.get("game_name", ""),
game_id=d.get("game_id"),
)
session.active = d.get("active", False)
session.turn_number = d.get("turn_number", 0)
session.title_screen_url = d.get("title_screen_url", "")
session.crown_holder = d.get("crown_holder") # 👑
session.players = {
uid: PlayerState.from_dict(pdata)
for uid, pdata in d.get("players", {}).items()
}
return session
[docs]
def to_redis(self) -> dict[str, str]:
"""Serialize the session into flat string fields for a Redis hash.
Produces the all-strings mapping persisted under
``sg:game_session:{channel_id}`` by :func:`persist_session`. Unlike
:meth:`to_dict`, every value is coerced to a string (the active flag to
``"1"``/``"0"``, the turn number stringified, an absent Crown holder to
``""``) and complex fields -- the player roster, pending choices -- are
JSON-encoded inline. Critically this form also carries the transient
countdown state (``pending_choices`` and ``countdown_started``) so an
in-progress turn survives a restart; :meth:`from_redis` is its inverse.
Pure -- no I/O.
Called by :func:`persist_session` and by the Redis persistence tests.
Returns:
dict[str, str]: Field-name -> stringified value, ready for ``HSET``.
"""
return {
"game_id": self.game_id,
"game_name": self.game_name,
"channel_id": self.channel_id,
"active": "1" if self.active else "0",
"turn_number": str(self.turn_number),
"title_screen_url": self.title_screen_url,
"crown_holder": self.crown_holder or "",
"players": json.dumps({
uid: (p.to_dict() if hasattr(p, "to_dict") else p)
for uid, p in self.players.items()
}),
"pending_choices": json.dumps(self.pending_choices),
"countdown_started": str(self._countdown_started),
}
[docs]
@classmethod
def from_redis(cls, raw: dict) -> GameSession:
"""Reconstruct a :class:`GameSession` from a raw Redis hash.
Inverse of :meth:`to_redis`: reads each hash field (accepting both
``bytes`` and ``str`` keys/values via the nested ``to_str`` helper),
rebuilds the session, coerces the stringified scalars back to their
proper types (active flag, integer turn number, optional Crown holder),
JSON-decodes the player roster into :class:`PlayerState` objects, and
restores the in-progress ``pending_choices`` and monotonic
``_countdown_started`` so a turn interrupted by a restart can resume.
Pure -- no I/O of its own.
Called by :func:`get_or_restore_session` when reviving a session from the
``sg:game_session`` hash, and by the Redis persistence tests.
Args:
raw: The raw Redis hash mapping (``bytes`` or ``str`` keyed) as
returned by ``HGETALL``.
Returns:
GameSession: The restored session instance.
"""
def to_str(val) -> str:
"""Decode a possibly-``bytes`` Redis value to ``str``.
Normalizes hash field values so the same parsing logic works
whether the Redis client returns ``bytes`` or already-decoded
``str``. Used throughout the enclosing :meth:`from_redis` to read
every field of the persisted session hash.
Args:
val: A raw Redis hash value, either ``bytes`` or ``str``.
Returns:
str: The UTF-8 decoded string, or ``val`` unchanged if it was
not ``bytes``.
"""
if isinstance(val, bytes):
return val.decode("utf-8")
return val
game_id = to_str(raw.get(b"game_id") or raw.get("game_id", ""))
game_name = to_str(raw.get(b"game_name") or raw.get("game_name", ""))
channel_id = to_str(raw.get(b"channel_id") or raw.get("channel_id", ""))
session = cls(
channel_id=channel_id,
game_name=game_name,
game_id=game_id if game_id else None,
)
active_val = to_str(raw.get(b"active") or raw.get("active", "0"))
session.active = active_val == "1"
turn_number_val = to_str(raw.get(b"turn_number") or raw.get("turn_number", "0"))
session.turn_number = int(turn_number_val)
title_url_val = to_str(raw.get(b"title_screen_url") or raw.get("title_screen_url", ""))
session.title_screen_url = title_url_val
crown_holder_val = to_str(raw.get(b"crown_holder") or raw.get("crown_holder", ""))
session.crown_holder = crown_holder_val if crown_holder_val else None
players_val = raw.get(b"players") or raw.get("players")
if players_val:
players_dict = json.loads(to_str(players_val))
session.players = {
uid: PlayerState.from_dict(pdata)
for uid, pdata in players_dict.items()
}
pending_val = raw.get(b"pending_choices") or raw.get("pending_choices")
if pending_val:
session.pending_choices = json.loads(to_str(pending_val))
countdown_val = to_str(raw.get(b"countdown_started") or raw.get("countdown_started", "0.0"))
session._countdown_started = float(countdown_val)
return session
async def _save_to_redis(self, redis: Any) -> None:
"""Write the session to Redis and refresh the global game index.
Performs the full persistence fan-out for a session: ``SET`` of the JSON
:meth:`to_dict` blob under ``game:session:{channel_id}``, an ``HSET`` of a
compact metadata entry into the global ``game:index`` hash (keyed by
``game_id``) so saved games can be listed and looked up, and a call to
:func:`persist_session` to also store the hash form under
``sg:game_session:{channel_id}`` with its TTL. Any Redis error is caught
and logged so saving never crashes the caller.
Called by :meth:`boot`, :meth:`exit_game`, and :meth:`record_turn` (the
per-turn autosave).
Args:
redis: The async Redis client to persist through.
"""
key = _SESSION_KEY.format(channel_id=self.channel_id)
try:
await redis.set(
key,
json.dumps(self.to_dict()),
)
# Update global game index # 🎮
index_entry = json.dumps(
{
"game_name": self.game_name,
"channel_id": self.channel_id,
"turn_number": self.turn_number,
"active": self.active,
"title_screen_url": self.title_screen_url,
}
)
await redis.hset(_GAME_INDEX_KEY, self.game_id, index_entry)
# Also persist session to sg:game_session hash
await persist_session(self.channel_id, self, redis)
except Exception as exc:
logger.error("Failed to save game session: %s", exc)
[docs]
@classmethod
async def load_from_redis(
cls,
channel_id: str,
redis: Any,
) -> GameSession | None:
"""Load a session from the legacy JSON Redis key, or return ``None``.
Reads the ``game:session:{channel_id}`` string written by
:meth:`_save_to_redis` and rebuilds the session via :meth:`from_dict`.
Returns ``None`` when the key is absent, and on any Redis or decode error
logs and returns ``None`` rather than raising. This is the older,
JSON-string persistence path; the newer hash path lives under
``sg:game_session`` and is read by :meth:`from_redis`.
Called by the message processor's game-restore flow,
:func:`get_or_restore_session` (as a fallback), and :func:`load_by_game_id`.
Args:
channel_id: Channel whose session should be loaded.
redis: The async Redis client to read through.
Returns:
GameSession | None: The restored session, or ``None`` if missing or
unreadable.
"""
key = _SESSION_KEY.format(channel_id=channel_id)
try:
raw = await redis.get(key)
if raw is None:
return None
return cls.from_dict(json.loads(raw))
except Exception as exc:
logger.error("Failed to load game session: %s", exc)
return None
[docs]
@classmethod
async def delete_from_redis(
cls,
channel_id: str,
redis: Any,
) -> None:
"""Delete all Redis state for a channel's session.
Issues a single ``DEL`` covering every key tied to the channel's game:
the legacy JSON session key ``game:session:{channel_id}``, its turn
history list ``game:session:{channel_id}:history``, and the newer hash
``sg:game_session:{channel_id}``. Redis errors are caught and logged so a
failed wipe never propagates. Note the global ``game:index`` entry is not
touched here.
Called by the Redis persistence tests; reachable as part of game-wipe and
teardown flows.
Args:
channel_id: Channel whose session keys should be removed.
redis: The async Redis client to delete through.
"""
try:
await redis.delete(
_SESSION_KEY.format(channel_id=channel_id),
_HISTORY_KEY.format(channel_id=channel_id),
f"sg:game_session:{channel_id}",
)
except Exception as exc:
logger.error("Failed to delete game session: %s", exc)
# ------------------------------------------------------------------
# Global session registry (in-memory, per-process) # 🔥
# ------------------------------------------------------------------
_active_sessions: dict[str, GameSession] = {}
[docs]
async def persist_session(channel_id: str, session: GameSession, redis: Any) -> None:
"""Persist a session as a Redis hash with a four-hour TTL.
Stores the flat :meth:`GameSession.to_redis` mapping under
``sg:game_session:{channel_id}`` via ``HSET`` and applies a 14400-second
(4 hour) expiry so abandoned sessions self-clean. This is the hash-form
counterpart to the JSON key written by :meth:`GameSession._save_to_redis`,
and the source :func:`get_or_restore_session` prefers when reviving a session
after a restart. A ``None`` client is a no-op; Redis errors are caught and
logged.
Called by :meth:`GameSession._save_to_redis` on every save, by
:func:`get_or_restore_session` when migrating a legacy session forward, and
by the Redis persistence tests.
Args:
channel_id: Channel the session belongs to; forms the hash key.
session: The :class:`GameSession` to serialize and store.
redis: Async Redis client, or ``None`` to skip persistence.
"""
if redis is None:
return
key = f"sg:game_session:{channel_id}"
try:
await redis.hset(key, mapping=session.to_redis())
await redis.expire(key, 3600 * 4) # 4h TTL
logger.debug("GameSession persisted to Redis for %s", channel_id[:8])
except Exception as exc:
logger.error("Failed to persist game session: %s", exc)
[docs]
def get_session(channel_id: str) -> GameSession | None:
"""Look up the in-memory session for a channel, if one is registered.
A pure read of the per-process ``_active_sessions`` registry; it does not
touch Redis, so it returns ``None`` for a session that exists only on disk
after a restart -- use :func:`get_or_restore_session` when a Redis fallback is
needed.
Called by the message processor and the game UI components.
Args:
channel_id: Channel to look up.
Returns:
GameSession | None: The live session, or ``None`` if not in memory.
"""
return _active_sessions.get(channel_id)
[docs]
async def get_or_restore_session(
channel_id: str,
redis: Any,
) -> GameSession | None:
"""Get session from memory, or restore from Redis after restart. # 💀🔥
After a bot reboot the in-memory dict is empty, but Redis still
has the persisted session. This transparently restores it.
"""
session = _active_sessions.get(channel_id)
if session is not None:
return session
# Try Redis # 🌀
if redis is None:
return None
try:
# Check sg:game_session first
key = f"sg:game_session:{channel_id}"
raw_hash = await redis.hgetall(key)
if raw_hash:
restored = GameSession.from_redis(raw_hash)
if restored and restored.active:
_active_sessions[channel_id] = restored
logger.info(
"Restored game session from Redis Hash: %s (%s) in channel %s",
restored.game_name,
restored.game_id,
channel_id,
)
return restored
# Fallback to legacy Redis key
restored = await GameSession.load_from_redis(channel_id, redis)
if restored is not None and restored.active:
_active_sessions[channel_id] = restored
# Migrate it to the new hash too
await persist_session(channel_id, restored, redis)
logger.info(
"Restored game session from Legacy Redis: %s (%s) in channel %s",
restored.game_name,
restored.game_id,
channel_id,
)
return restored
except Exception as exc:
logger.error("Failed to restore session from Redis: %s", exc)
return None
[docs]
def set_session(channel_id: str, session: GameSession) -> None:
"""Register (or replace) the in-memory session for a channel.
Inserts ``session`` into the per-process ``_active_sessions`` registry so
later :func:`get_session` and :func:`get_or_restore_session` calls find it
without a Redis round-trip. Pure in-memory mutation; does not persist -- the
caller is responsible for saving via the session's own Redis methods.
Called by the message processor, the Discord platform adapters
(``discord.py`` and ``discord_self.py``), the ``game_controls`` and
``hot_swap_game`` tools, and the Redis persistence tests.
Args:
channel_id: Channel to bind the session to.
session: The :class:`GameSession` to register.
"""
_active_sessions[channel_id] = session
[docs]
def remove_session(channel_id: str) -> GameSession | None:
"""Remove and return the in-memory session for a channel.
Pops ``channel_id`` from the per-process ``_active_sessions`` registry,
returning the evicted :class:`GameSession` or ``None`` if none was
registered. Only clears the live reference; it does not delete any Redis
state (use :meth:`GameSession.delete_from_redis` for that).
Called by the message processor, the game UI components, and the
``exit_game``, ``hot_swap_game``, and ``wipe_game_data`` tools.
Args:
channel_id: Channel whose session should be unregistered.
Returns:
GameSession | None: The removed session, or ``None`` if absent.
"""
return _active_sessions.pop(channel_id, None)
[docs]
async def list_all_games(
redis: Any,
) -> list[dict[str, Any]]:
"""List every saved game recorded in the global game index.
Reads the whole ``game:index`` hash with ``HGETALL``, decoding each entry
(handling both ``bytes`` and ``str`` from the Redis client) into its metadata
dict, stamping the ``game_id`` from the hash field, and returning the list
sorted by game name. Returns an empty list when the index is empty, and on
any Redis or decode error logs and returns an empty list rather than raising.
Called by the lobby modal, several game UI components, and the message
processor's game-listing flow.
Args:
redis: The async Redis client to read the index through.
Returns:
list[dict[str, Any]]: One metadata dict per saved game (each including
its ``game_id``), sorted by ``game_name``.
"""
try:
raw = await redis.hgetall(_GAME_INDEX_KEY)
if not raw:
return []
games = []
for gid, data in raw.items():
gid_str = gid if isinstance(gid, str) else gid.decode()
data_str = data if isinstance(data, str) else data.decode()
entry = json.loads(data_str)
entry["game_id"] = gid_str
games.append(entry)
return sorted(games, key=lambda g: g.get("game_name", ""))
except Exception as exc:
logger.error("Failed to list games: %s", exc)
return []
[docs]
async def load_by_game_id(
game_id: str,
redis: Any,
) -> GameSession | None:
"""Resolve a ``game_id`` to its channel and load that session.
Performs an ``HGET`` against the global ``game:index`` hash to find the saved
game's metadata, extracts its ``channel_id``, and then delegates to
:meth:`GameSession.load_from_redis` to rebuild the full session from that
channel's stored state. Returns ``None`` when the id is unknown or carries no
channel; any Redis or decode error is logged and yields ``None`` instead of
raising.
Called by the message processor's load-by-id flow.
Args:
game_id: The stable game identifier to resolve.
redis: The async Redis client to read through.
Returns:
GameSession | None: The loaded session, or ``None`` if the id is unknown
or its session could not be restored.
"""
try:
raw_entry = await redis.hget(_GAME_INDEX_KEY, game_id)
if not raw_entry:
return None
entry_str = raw_entry if isinstance(raw_entry, str) else raw_entry.decode()
entry = json.loads(entry_str)
channel_id = entry.get("channel_id", "")
if not channel_id:
return None
return await GameSession.load_from_redis(channel_id, redis)
except Exception as exc:
logger.error("Failed to load game by ID: %s", exc)
return None
[docs]
class GameSessionLockRegistry:
"""Per-process pool of per-session :class:`asyncio.Lock` objects.
Hands out one lock per ``session_id`` so concurrent coroutines operating on
the same game session can be serialized without blocking unrelated sessions.
The lock pool is created lazily and guarded by an internal ``_pool_lock`` so
pool mutations are themselves atomic across coroutines. State is purely
in-memory and process-local -- it does not coordinate across the bot's
microservices. Instantiated and exercised by
``tests/test_game_engine_remediation.py``; no module-level singleton is
created here.
"""
[docs]
def __init__(self) -> None:
"""Create an empty per-process registry of per-session locks.
Sets up the lazily populated mapping of session id to
:class:`asyncio.Lock`, plus a single guard lock (``_pool_lock``) that
makes mutations of that mapping atomic across coroutines. Holds no
external state and performs no I/O. Instantiated by callers that need
to serialize concurrent operations on the same session (e.g. the
``test_game_engine_remediation`` test); no module-level singleton is
created here.
"""
self._locks: dict[str, asyncio.Lock] = {}
self._pool_lock = asyncio.Lock()
[docs]
async def get_session_lock(self, session_id: str) -> asyncio.Lock:
"""Return the lock for a session, creating it on first request.
Looks up ``session_id`` in the internal pool and mints a fresh
:class:`asyncio.Lock` if none exists yet, always returning the same lock
object for a given id so callers can ``async with`` it to serialize
operations on that session. The lookup-and-create runs under
``_pool_lock`` so it is atomic with respect to concurrent callers and to
:meth:`cleanup_session`.
Exercised by ``tests/test_game_engine_remediation.py``, which asserts
repeated calls for the same id return one identity-stable lock while
different ids get distinct locks.
Args:
session_id: Identifier of the session to obtain a lock for.
Returns:
asyncio.Lock: The shared lock instance for this session.
"""
async with self._pool_lock:
if session_id not in self._locks:
self._locks[session_id] = asyncio.Lock()
return self._locks[session_id]
[docs]
async def cleanup_session(self, session_id: str) -> None:
"""Drop the lock associated with a finished session.
Removes ``session_id`` from the internal lock pool so the registry does
not leak ``asyncio.Lock`` objects for sessions that have ended. The
delete is performed while holding ``_pool_lock`` so it is atomic with
respect to concurrent :meth:`get_session_lock` calls; a subsequent
request for the same id will mint a brand-new lock. Exercised by the
``test_multiplayer_session_in_memory_lock_registry`` test, which
asserts the post-cleanup lock is a distinct object.
Args:
session_id: Identifier of the session whose lock should be
discarded. A no-op if no lock is currently registered for it.
"""
async with self._pool_lock:
if session_id in self._locks:
del self._locks[session_id]