"""GameTurnAgent -- roster-coverage validator for game choice buttons.
Subscribes to ``game:turn:complete`` pub/sub events. Proofreading of
individual choices (strip markdown, trim emojis, truncate labels) is now
done inline by the renderer; the helper ``_proofread_choice`` /
``_first_emoji`` functions in this module remain available but the agent's
runtime job is validation only. When a game turn completes, this agent:
1. Reads any still-present draft choice blocks from Redis
(``game:choices:draft:{channel_id}``) to see which players were covered
2. Validates roster coverage against the session:
- All ACTIVE/IDLE players must have choices
- DORMANT players should not have choices
3. Pings Star via a system message if active/idle players are missing
choices
If the draft key was already consumed by the renderer, the agent trusts
the renderer and skips validation.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import re
import time
from typing import Any
logger = logging.getLogger(__name__)
TURN_COMPLETE_CHANNEL = "game:turn:complete"
# Seconds to wait for Star's choice drafts to appear in Redis
_DRAFT_WAIT_SECONDS = 5.0
_DRAFT_POLL_INTERVAL = 0.5
def _first_emoji(text: str) -> str:
"""Extract the first emoji from a string.
Handles multi-codepoint emoji (skin tones, ZWJ sequences) by
returning the first complete grapheme cluster that looks emoji-like.
Falls back to first 2 chars if no emoji pattern matches.
"""
# Match common emoji patterns including ZWJ sequences
m = re.match(
r"[\U0001f300-\U0001faff\U00002702-\U000027b0"
r"\U0000fe00-\U0000fe0f\U0000200d\U0001f3fb-\U0001f3ff]+",
text,
)
if m:
return m.group(0)
return text[:2] if text else "\U0001f300"
def _proofread_choice(choice: dict[str, str]) -> dict[str, str]:
"""Normalize one game-turn choice block's emoji and label.
Cleans a single ``{"emoji", "label"}`` choice for display: strips markdown
emphasis (bold/italic/underline/strikethrough/inline-code) from the label,
reduces the emoji to its first glyph via :func:`_first_emoji`, collapses
whitespace/newlines, and truncates the label to 70 characters. Pure -- returns
a new dict and does not mutate its input.
Applied per choice by :class:`GameTurnAgent` while proofreading the draft
choices for a completed turn.
Args:
choice: A choice mapping with ``emoji`` and ``label`` keys.
Returns:
dict[str, str]: A cleaned ``{"emoji", "label"}`` dict.
"""
emoji = choice.get("emoji", "")
label = choice.get("label", "")
# Strip markdown from labels
label = re.sub(r"\*\*(.+?)\*\*", r"\1", label) # bold
label = re.sub(r"__(.+?)__", r"\1", label) # underline
label = re.sub(r"`(.+?)`", r"\1", label) # backticks
label = re.sub(r"~~(.+?)~~", r"\1", label) # strikethrough
label = re.sub(r"\*(.+?)\*", r"\1", label) # italic
# Trim to first emoji only
emoji = _first_emoji(emoji)
# Strip newlines, normalize whitespace, truncate
label = label.replace("\n", " ").strip()
label = re.sub(r"\s{2,}", " ", label)
label = label[:70]
return {"emoji": emoji, "label": label}
[docs]
class GameTurnAgent:
"""Background agent that proofreads and validates game-turn choices.
Subscribes to the ``game:turn:complete`` Redis pub/sub channel and, for each
completed turn, cleans the channel's draft choices (via
:func:`_proofread_choice`) and checks that every active player has a choice --
pinging Star through the Discord platform when coverage is missing.
Constructed by the agents service bootstrap in ``agents_main.py`` and driven
by its long-lived listener task (see :meth:`start`/:meth:`_listen`); the Redis
client is injected at :meth:`start` time.
"""
[docs]
def __init__(self, discord_platform: Any = None) -> None:
"""Initialize the roster-coverage validator agent.
Stores the Discord platform handle (used by :meth:`_ping_star` to send a
system message into a channel when active players are missing choices) and
sets the listener task slot and running flag to their idle defaults. The
Redis client is not supplied here; it is injected later in :meth:`start`.
No subscriptions or I/O happen at construction time.
Constructed by the agents service bootstrap in ``agents_main.py``.
Args:
discord_platform: Discord platform object exposing a ``client`` that
can resolve and send to channels. When ``None``, :meth:`_ping_star`
logs a warning and skips the ping.
"""
self._discord = discord_platform
self._task: asyncio.Task[None] | None = None
self._running = False
[docs]
async def start(self, redis: Any) -> None:
"""Start the pub/sub listener that validates completed game turns.
Stores the injected async Redis client, marks the agent running, and
spawns the long-lived :meth:`_listen` coroutine as a named
``game-turn-agent`` :class:`asyncio.Task`. This is the agent's entry
point: once started it watches the ``game:turn:complete`` channel and
reacts to each turn-completion event. No subscription happens until the
task runs, so this returns immediately after scheduling.
Called by the agents service bootstrap in ``agents_main.py`` (the
``AgentsService`` start path), which constructs the agent and then
awaits ``start(redis)``.
Args:
redis: An async Redis client used both for pub/sub subscription and
for reading the per-channel draft-choice list during validation.
Returns:
None.
"""
self._running = True
self._redis = redis
self._task = asyncio.create_task(self._listen(), name="game-turn-agent")
logger.info("GameTurnAgent started")
[docs]
async def stop(self) -> None:
"""Stop the agent and tear down its listener task.
Clears the running flag so :meth:`_listen` exits its loop, then cancels
the background listener task and awaits it, swallowing the resulting
:class:`asyncio.CancelledError`. Used for clean shutdown so the pub/sub
subscription and its task do not leak when the service stops.
Called by the agents service shutdown path in ``agents_main.py`` (the
``AgentsService`` stop/cleanup sequence).
Returns:
None.
"""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
logger.info("GameTurnAgent stopped")
async def _listen(self) -> None:
"""Subscribe to the turn-complete channel and dispatch each event.
Opens a Redis pub/sub subscription on :data:`TURN_COMPLETE_CHANNEL`
(``game:turn:complete``) and loops over inbound messages while
``self._running`` is set, decoding each JSON payload and spawning a
fire-and-forget :meth:`_handle_turn` task per event so validation never
blocks the listen loop. Malformed messages are logged at debug and
skipped; :class:`asyncio.CancelledError` ends the loop quietly and any
other failure is logged before the task exits.
Runs only as the background task created in :meth:`start`; it is not
called directly elsewhere.
Returns:
None.
"""
try:
pubsub = self._redis.pubsub()
await pubsub.subscribe(TURN_COMPLETE_CHANNEL)
logger.info(
"GameTurnAgent subscribed to %s",
TURN_COMPLETE_CHANNEL,
)
async for message in pubsub.listen():
if not self._running:
break
if message["type"] != "message":
continue
try:
data_str = (
message["data"]
if isinstance(message["data"], str)
else message["data"].decode()
)
data = json.loads(data_str)
asyncio.create_task(self._handle_turn(data))
except Exception:
logger.debug(
"GameTurnAgent: bad message",
exc_info=True,
)
except asyncio.CancelledError:
pass
except Exception:
logger.exception("GameTurnAgent listener crashed")
async def _handle_turn(self, data: dict[str, Any]) -> None:
"""Process a turn completion event.
Proofreading is now done INLINE by the renderer. This agent
only validates roster coverage and pings Star if she forgot
active players or included dormant ones.
"""
t0 = time.monotonic()
channel_id = data.get("channel_id", "")
if not channel_id:
return
# The renderer already consumed the draft key by now.
# Use the player names from the event payload for validation.
active_player_names: list[str] = data.get(
"active_player_names",
[],
)
def _emit(status: str, missing_list: list = [], dormant_list: list = []):
"""Publish a ``game_turn_validated`` observability debug event.
Nested helper that fires a fire-and-forget ``publish_debug_event`` task
(named ``obs_game_turn_validated``) recording the validation outcome for
this turn, closing over ``channel_id`` and the ``t0`` start time to stamp
the channel, phase, status, elapsed duration, and a preview/payload of
missing and dormant-covered players. It is used at each exit point of the
enclosing :meth:`_handle_turn`.
Args:
status: Validation outcome label (e.g. ``"ok"``, ``"no_session"``,
``"missing_players"``, ``"dormant_covered"``).
missing_list: Active/idle player names that lacked choices.
dormant_list: Dormant player names that were covered despite being
dormant.
Returns:
None. The event is dispatched asynchronously as a background task.
"""
from observability import publish_debug_event
asyncio.create_task(
publish_debug_event(
"game_turn_validated",
"game_turn_agent",
channel_id=channel_id,
phase="validation",
status=status,
duration_ms=(time.monotonic() - t0) * 1000,
preview=f"channel={channel_id} missing={missing_list} dormant_covered={dormant_list}",
payload={
"channel_id": channel_id,
"missing_players": missing_list,
"dormant_covered": dormant_list,
},
),
name="obs_game_turn_validated",
)
# Get the game session for roster validation
try:
from game_session import get_or_restore_session, ActivityTier
session = await get_or_restore_session(channel_id, self._redis)
except ImportError:
session = None
if not session or not session.active:
_emit("no_session")
return
# Check what choices were ACTUALLY submitted this turn
# by re-reading the draft key (may still exist briefly)
draft_key = f"game:choices:draft:{channel_id}"
covered_names: set[str] = set()
try:
raw_blocks = await self._redis.lrange(draft_key, 0, -1)
for raw in raw_blocks:
block_str = raw if isinstance(raw, str) else raw.decode()
block = json.loads(block_str)
pname = block.get("player_name", "")
if pname:
covered_names.add(pname.upper())
except Exception:
pass
# If the draft key was already consumed by the renderer,
# we have no data to validate against. The renderer already
# handled proofreading and rendering — trust it.
if not covered_names:
logger.debug(
"GameTurnAgent: draft key already consumed for %s "
"— renderer handled it, skipping validation",
channel_id,
)
_emit("ok")
return
# Get tiered roster
tiered = session.get_all_players_tiered()
required = {
ps.user_name.upper()
for ps, tier in tiered
if tier in (ActivityTier.ACTIVE, ActivityTier.IDLE)
}
dormant_names = {
ps.user_name.upper() for ps, tier in tiered if tier == ActivityTier.DORMANT
}
# Check for dormant players that got choices (shouldn't happen)
dormant_covered = covered_names & dormant_names
if dormant_covered:
logger.info(
"GameTurnAgent: dormant players had choices: %s "
"(renderer should have stripped them)",
dormant_covered,
)
# Check for missing active/idle players
missing = required - covered_names
if missing:
await self._ping_star(
channel_id,
f"\u26e7 **S.N.E.S. ENGINE**: Missing choices for active players: "
f"**{', '.join(sorted(missing))}**. "
f"Call `set_game_choices` for each active player.",
)
status = "ok"
if missing:
status = "missing_players"
elif dormant_covered:
status = "dormant_covered"
_emit(
status, missing_list=sorted(missing), dormant_list=sorted(dormant_covered)
)
async def _ping_star(self, channel_id: str, message: str) -> None:
"""Send a system nudge into a Discord channel so Star can self-correct.
Resolves the channel through the stored Discord platform handle
(preferring the cached ``client.get_channel`` and falling back to
``client.fetch_channel``) and posts *message* there. This is how the
validator tells Star she missed choices for active players, prompting a
follow-up ``set_game_choices`` call. When no Discord platform was
supplied it logs a warning and returns; all send failures are caught and
logged so a delivery error never propagates into the listener.
Called only by :meth:`_handle_turn` when required active/idle players
are missing from the submitted choice drafts.
Args:
channel_id: The Discord channel id (as a string) to post into; it is
cast to ``int`` for the client lookup.
message: The system message text to send.
Returns:
None.
"""
if self._discord is None:
logger.warning("GameTurnAgent: no Discord platform for ping")
return
try:
client = getattr(self._discord, "client", None)
if client is None:
return
channel = client.get_channel(int(channel_id))
if channel is None:
channel = await client.fetch_channel(int(channel_id))
if channel:
await channel.send(message)
logger.info(
"GameTurnAgent: pinged Star in %s",
channel_id,
)
except Exception:
logger.warning(
"GameTurnAgent: failed to ping Star",
exc_info=True,
)
# ---- Helper: publish turn-complete from main pipeline ----
[docs]
async def publish_turn_complete(
redis: Any,
channel_id: str,
narrative: str,
game_id: str = "",
game_name: str = "",
turn_number: int = 0,
active_player_names: list[str] | None = None,
) -> None:
"""Publish a turn-completion event onto the GameTurnAgent's pub/sub channel.
Serializes the turn metadata (channel, a truncated narrative preview, game
id/name, turn number, the active player roster, and a timestamp) and
publishes it on :data:`TURN_COMPLETE_CHANNEL` (``game:turn:complete``) so the
:class:`GameTurnAgent` listener can validate roster coverage for the turn.
No-ops when *redis* is ``None``; publish failures are caught and logged so a
Redis hiccup never blocks the send pipeline.
Called from the main message pipeline in
``message_processor/generate_and_send.py`` once Star's narrative has been
sent for a game turn.
Args:
redis: An async Redis client, or ``None`` to skip publishing.
channel_id: The Discord channel id the turn belongs to.
narrative: Star's turn narrative; truncated to 3000 chars in the payload.
game_id: Optional game identifier carried through to the event.
game_name: Optional human-readable game name carried through.
turn_number: The turn index, included in the payload and debug log.
active_player_names: Names of players active this turn, used by the agent
for roster-coverage validation; ``None`` becomes an empty list.
Returns:
None.
"""
if redis is None:
return
payload = json.dumps(
{
"channel_id": channel_id,
"narrative": narrative[:3000],
"game_id": game_id,
"game_name": game_name,
"turn_number": turn_number,
"active_player_names": active_player_names or [],
"ts": time.time(),
}
)
try:
await redis.publish(TURN_COMPLETE_CHANNEL, payload)
logger.debug(
"Published turn-complete for channel %s (turn %d)",
channel_id,
turn_number,
)
except Exception as exc:
logger.warning("Failed to publish turn-complete: %s", exc)