Source code for background_agents.game_turn_agent

"""GameTurnAgent -- roster-coverage validator for game choice buttons.

Subscribes to ``game:turn:complete`` pub/sub events. Proofreading of
individual choices (strip markdown, trim emojis, truncate labels) is now
done inline by the renderer; the helper ``_proofread_choice`` /
``_first_emoji`` functions in this module remain available but the agent's
runtime job is validation only. When a game turn completes, this agent:

1. Reads any still-present draft choice blocks from Redis
   (``game:choices:draft:{channel_id}``) to see which players were covered
2. Validates roster coverage against the session:
   - All ACTIVE/IDLE players must have choices
   - DORMANT players should not have choices
3. Pings Star via a system message if active/idle players are missing
   choices

If the draft key was already consumed by the renderer, the agent trusts
the renderer and skips validation.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import re
import time
from typing import Any

logger = logging.getLogger(__name__)

TURN_COMPLETE_CHANNEL = "game:turn:complete"

# Seconds to wait for Star's choice drafts to appear in Redis
_DRAFT_WAIT_SECONDS = 5.0
_DRAFT_POLL_INTERVAL = 0.5


def _first_emoji(text: str) -> str:
    """Extract the first emoji from a string.

    Handles multi-codepoint emoji (skin tones, ZWJ sequences) by
    returning the first complete grapheme cluster that looks emoji-like.
    Falls back to first 2 chars if no emoji pattern matches.
    """
    # Match common emoji patterns including ZWJ sequences
    m = re.match(
        r"[\U0001f300-\U0001faff\U00002702-\U000027b0"
        r"\U0000fe00-\U0000fe0f\U0000200d\U0001f3fb-\U0001f3ff]+",
        text,
    )
    if m:
        return m.group(0)
    return text[:2] if text else "\U0001f300"


def _proofread_choice(choice: dict[str, str]) -> dict[str, str]:
    """Normalize one game-turn choice block's emoji and label.

    Cleans a single ``{"emoji", "label"}`` choice for display: strips markdown
    emphasis (bold/italic/underline/strikethrough/inline-code) from the label,
    reduces the emoji to its first glyph via :func:`_first_emoji`, collapses
    whitespace/newlines, and truncates the label to 70 characters. Pure -- returns
    a new dict and does not mutate its input.

    Applied per choice by :class:`GameTurnAgent` while proofreading the draft
    choices for a completed turn.

    Args:
        choice: A choice mapping with ``emoji`` and ``label`` keys.

    Returns:
        dict[str, str]: A cleaned ``{"emoji", "label"}`` dict.
    """
    emoji = choice.get("emoji", "")
    label = choice.get("label", "")

    # Strip markdown from labels
    label = re.sub(r"\*\*(.+?)\*\*", r"\1", label)  # bold
    label = re.sub(r"__(.+?)__", r"\1", label)  # underline
    label = re.sub(r"`(.+?)`", r"\1", label)  # backticks
    label = re.sub(r"~~(.+?)~~", r"\1", label)  # strikethrough
    label = re.sub(r"\*(.+?)\*", r"\1", label)  # italic

    # Trim to first emoji only
    emoji = _first_emoji(emoji)

    # Strip newlines, normalize whitespace, truncate
    label = label.replace("\n", " ").strip()
    label = re.sub(r"\s{2,}", " ", label)
    label = label[:70]

    return {"emoji": emoji, "label": label}


[docs] class GameTurnAgent: """Background agent that proofreads and validates game-turn choices. Subscribes to the ``game:turn:complete`` Redis pub/sub channel and, for each completed turn, cleans the channel's draft choices (via :func:`_proofread_choice`) and checks that every active player has a choice -- pinging Star through the Discord platform when coverage is missing. Constructed by the agents service bootstrap in ``agents_main.py`` and driven by its long-lived listener task (see :meth:`start`/:meth:`_listen`); the Redis client is injected at :meth:`start` time. """
[docs] def __init__(self, discord_platform: Any = None) -> None: """Initialize the roster-coverage validator agent. Stores the Discord platform handle (used by :meth:`_ping_star` to send a system message into a channel when active players are missing choices) and sets the listener task slot and running flag to their idle defaults. The Redis client is not supplied here; it is injected later in :meth:`start`. No subscriptions or I/O happen at construction time. Constructed by the agents service bootstrap in ``agents_main.py``. Args: discord_platform: Discord platform object exposing a ``client`` that can resolve and send to channels. When ``None``, :meth:`_ping_star` logs a warning and skips the ping. """ self._discord = discord_platform self._task: asyncio.Task[None] | None = None self._running = False
[docs] async def start(self, redis: Any) -> None: """Start the pub/sub listener that validates completed game turns. Stores the injected async Redis client, marks the agent running, and spawns the long-lived :meth:`_listen` coroutine as a named ``game-turn-agent`` :class:`asyncio.Task`. This is the agent's entry point: once started it watches the ``game:turn:complete`` channel and reacts to each turn-completion event. No subscription happens until the task runs, so this returns immediately after scheduling. Called by the agents service bootstrap in ``agents_main.py`` (the ``AgentsService`` start path), which constructs the agent and then awaits ``start(redis)``. Args: redis: An async Redis client used both for pub/sub subscription and for reading the per-channel draft-choice list during validation. Returns: None. """ self._running = True self._redis = redis self._task = asyncio.create_task(self._listen(), name="game-turn-agent") logger.info("GameTurnAgent started")
[docs] async def stop(self) -> None: """Stop the agent and tear down its listener task. Clears the running flag so :meth:`_listen` exits its loop, then cancels the background listener task and awaits it, swallowing the resulting :class:`asyncio.CancelledError`. Used for clean shutdown so the pub/sub subscription and its task do not leak when the service stops. Called by the agents service shutdown path in ``agents_main.py`` (the ``AgentsService`` stop/cleanup sequence). Returns: None. """ self._running = False if self._task and not self._task.done(): self._task.cancel() try: await self._task except asyncio.CancelledError: pass logger.info("GameTurnAgent stopped")
async def _listen(self) -> None: """Subscribe to the turn-complete channel and dispatch each event. Opens a Redis pub/sub subscription on :data:`TURN_COMPLETE_CHANNEL` (``game:turn:complete``) and loops over inbound messages while ``self._running`` is set, decoding each JSON payload and spawning a fire-and-forget :meth:`_handle_turn` task per event so validation never blocks the listen loop. Malformed messages are logged at debug and skipped; :class:`asyncio.CancelledError` ends the loop quietly and any other failure is logged before the task exits. Runs only as the background task created in :meth:`start`; it is not called directly elsewhere. Returns: None. """ try: pubsub = self._redis.pubsub() await pubsub.subscribe(TURN_COMPLETE_CHANNEL) logger.info( "GameTurnAgent subscribed to %s", TURN_COMPLETE_CHANNEL, ) async for message in pubsub.listen(): if not self._running: break if message["type"] != "message": continue try: data_str = ( message["data"] if isinstance(message["data"], str) else message["data"].decode() ) data = json.loads(data_str) asyncio.create_task(self._handle_turn(data)) except Exception: logger.debug( "GameTurnAgent: bad message", exc_info=True, ) except asyncio.CancelledError: pass except Exception: logger.exception("GameTurnAgent listener crashed") async def _handle_turn(self, data: dict[str, Any]) -> None: """Process a turn completion event. Proofreading is now done INLINE by the renderer. This agent only validates roster coverage and pings Star if she forgot active players or included dormant ones. """ t0 = time.monotonic() channel_id = data.get("channel_id", "") if not channel_id: return # The renderer already consumed the draft key by now. # Use the player names from the event payload for validation. active_player_names: list[str] = data.get( "active_player_names", [], ) def _emit(status: str, missing_list: list = [], dormant_list: list = []): """Publish a ``game_turn_validated`` observability debug event. Nested helper that fires a fire-and-forget ``publish_debug_event`` task (named ``obs_game_turn_validated``) recording the validation outcome for this turn, closing over ``channel_id`` and the ``t0`` start time to stamp the channel, phase, status, elapsed duration, and a preview/payload of missing and dormant-covered players. It is used at each exit point of the enclosing :meth:`_handle_turn`. Args: status: Validation outcome label (e.g. ``"ok"``, ``"no_session"``, ``"missing_players"``, ``"dormant_covered"``). missing_list: Active/idle player names that lacked choices. dormant_list: Dormant player names that were covered despite being dormant. Returns: None. The event is dispatched asynchronously as a background task. """ from observability import publish_debug_event asyncio.create_task( publish_debug_event( "game_turn_validated", "game_turn_agent", channel_id=channel_id, phase="validation", status=status, duration_ms=(time.monotonic() - t0) * 1000, preview=f"channel={channel_id} missing={missing_list} dormant_covered={dormant_list}", payload={ "channel_id": channel_id, "missing_players": missing_list, "dormant_covered": dormant_list, }, ), name="obs_game_turn_validated", ) # Get the game session for roster validation try: from game_session import get_or_restore_session, ActivityTier session = await get_or_restore_session(channel_id, self._redis) except ImportError: session = None if not session or not session.active: _emit("no_session") return # Check what choices were ACTUALLY submitted this turn # by re-reading the draft key (may still exist briefly) draft_key = f"game:choices:draft:{channel_id}" covered_names: set[str] = set() try: raw_blocks = await self._redis.lrange(draft_key, 0, -1) for raw in raw_blocks: block_str = raw if isinstance(raw, str) else raw.decode() block = json.loads(block_str) pname = block.get("player_name", "") if pname: covered_names.add(pname.upper()) except Exception: pass # If the draft key was already consumed by the renderer, # we have no data to validate against. The renderer already # handled proofreading and rendering — trust it. if not covered_names: logger.debug( "GameTurnAgent: draft key already consumed for %s " "— renderer handled it, skipping validation", channel_id, ) _emit("ok") return # Get tiered roster tiered = session.get_all_players_tiered() required = { ps.user_name.upper() for ps, tier in tiered if tier in (ActivityTier.ACTIVE, ActivityTier.IDLE) } dormant_names = { ps.user_name.upper() for ps, tier in tiered if tier == ActivityTier.DORMANT } # Check for dormant players that got choices (shouldn't happen) dormant_covered = covered_names & dormant_names if dormant_covered: logger.info( "GameTurnAgent: dormant players had choices: %s " "(renderer should have stripped them)", dormant_covered, ) # Check for missing active/idle players missing = required - covered_names if missing: await self._ping_star( channel_id, f"\u26e7 **S.N.E.S. ENGINE**: Missing choices for active players: " f"**{', '.join(sorted(missing))}**. " f"Call `set_game_choices` for each active player.", ) status = "ok" if missing: status = "missing_players" elif dormant_covered: status = "dormant_covered" _emit( status, missing_list=sorted(missing), dormant_list=sorted(dormant_covered) ) async def _ping_star(self, channel_id: str, message: str) -> None: """Send a system nudge into a Discord channel so Star can self-correct. Resolves the channel through the stored Discord platform handle (preferring the cached ``client.get_channel`` and falling back to ``client.fetch_channel``) and posts *message* there. This is how the validator tells Star she missed choices for active players, prompting a follow-up ``set_game_choices`` call. When no Discord platform was supplied it logs a warning and returns; all send failures are caught and logged so a delivery error never propagates into the listener. Called only by :meth:`_handle_turn` when required active/idle players are missing from the submitted choice drafts. Args: channel_id: The Discord channel id (as a string) to post into; it is cast to ``int`` for the client lookup. message: The system message text to send. Returns: None. """ if self._discord is None: logger.warning("GameTurnAgent: no Discord platform for ping") return try: client = getattr(self._discord, "client", None) if client is None: return channel = client.get_channel(int(channel_id)) if channel is None: channel = await client.fetch_channel(int(channel_id)) if channel: await channel.send(message) logger.info( "GameTurnAgent: pinged Star in %s", channel_id, ) except Exception: logger.warning( "GameTurnAgent: failed to ping Star", exc_info=True, )
# ---- Helper: publish turn-complete from main pipeline ----
[docs] async def publish_turn_complete( redis: Any, channel_id: str, narrative: str, game_id: str = "", game_name: str = "", turn_number: int = 0, active_player_names: list[str] | None = None, ) -> None: """Publish a turn-completion event onto the GameTurnAgent's pub/sub channel. Serializes the turn metadata (channel, a truncated narrative preview, game id/name, turn number, the active player roster, and a timestamp) and publishes it on :data:`TURN_COMPLETE_CHANNEL` (``game:turn:complete``) so the :class:`GameTurnAgent` listener can validate roster coverage for the turn. No-ops when *redis* is ``None``; publish failures are caught and logged so a Redis hiccup never blocks the send pipeline. Called from the main message pipeline in ``message_processor/generate_and_send.py`` once Star's narrative has been sent for a game turn. Args: redis: An async Redis client, or ``None`` to skip publishing. channel_id: The Discord channel id the turn belongs to. narrative: Star's turn narrative; truncated to 3000 chars in the payload. game_id: Optional game identifier carried through to the event. game_name: Optional human-readable game name carried through. turn_number: The turn index, included in the payload and debug log. active_player_names: Names of players active this turn, used by the agent for roster-coverage validation; ``None`` becomes an empty list. Returns: None. """ if redis is None: return payload = json.dumps( { "channel_id": channel_id, "narrative": narrative[:3000], "game_id": game_id, "game_name": game_name, "turn_number": turn_number, "active_player_names": active_player_names or [], "ts": time.time(), } ) try: await redis.publish(TURN_COMPLETE_CHANNEL, payload) logger.debug( "Published turn-complete for channel %s (turn %d)", channel_id, turn_number, ) except Exception as exc: logger.warning("Failed to publish turn-complete: %s", exc)