Source code for star_avatar

"""Star Avatar Expression System  --  backend emotion-to-avatar mapping.

Reads Star's current dominant emotional state from the NCM limbic shard
and updates her Matrix avatar to match.  Called once per response, just
before the reply is sent.

The avatar images must already be uploaded to the Matrix homeserver as
mxc:// URIs.  On first run (or when the Redis key ``star:avatar:mxc_map``
is empty), the images are uploaded from disk and the mxc URIs are cached.

Expression mapping:
    default   ->  star-avatar.png     (confident cigar pose)
    laugh     ->  star-laugh.png      (cry-laughing, amused)
    rage      ->  star-rage.png       (fists clenched, angry)
    facepalm  ->  star-facepalm.png   (hand over eye, exasperated)
    desire    ->  star-desire.png     (placeholder asset)
    fear      ->  star-fear.png       (placeholder asset)
    tender    ->  star-tender.png     (placeholder asset)
    smug      ->  star-smug.png       (placeholder asset)

    When the first dominant emotion in list order that matches the winning
    bucket is labelled (intense) or (overwhelming), the classifier may
    return ``{bucket}_intense`` (e.g. ``rage_intense``).  Avatar update
    falls back to the base bucket key if that variant is absent from the
    mxc map.
"""

# 🔥💀 the lattice has moods

from __future__ import annotations

import asyncio
import io
import jsonutil as json
import logging
import re
from pathlib import Path
from typing import Any, Optional

logger = logging.getLogger(__name__)

# Intensity suffix from format_context_injection, e.g. "RAGE (intense)".
_INTENSITY_CAPTURE_RE = re.compile(
    r"^(?P<base>.+?)\s*\((?P<intensity>trace|faint|moderate|intense|overwhelming)\)\s*$",
    re.IGNORECASE | re.DOTALL,
)
# End-anchored strip when the full-line intensity pattern does not match.
_INTENSITY_STRIP_RE = re.compile(
    r"\s*\((?:trace|faint|moderate|intense|overwhelming)\)\s*$",
    re.IGNORECASE,
)
_INTENSE_LABELS = frozenset({"intense", "overwhelming"})

# ── Emotion -> expression mapping ──────────────────────────────────
# These are the "dominant_emotions" values that the limbic system /
# star_self_mirror produce.  We cluster them into expression buckets.

_LAUGH_EMOTIONS = frozenset(
    {
        "amusement",
        "playful",
        "euphoria",
        "joy",
        "elation",
        "excitement",
        "delight",
        "mirth",
        "giddiness",
        "schadenfreude",
    }
)

_RAGE_EMOTIONS = frozenset(
    {
        "anger",
        "rage",
        "fury",
        "irritation",
        "frustration",
        "indignation",
        "wrath",
        "hostility",
        "aggression",
        "defiance",
        "combative",
        "vindictive",
    }
)

_FACEPALM_EMOTIONS = frozenset(
    {
        "exasperation",
        "resignation",
        "disappointment",
        "dismay",
        "embarrassment",
        "cringe",
        "fatigue",
        "boredom",
        "apathy",
        "annoyance",
        "weariness",
        "melancholy",
    }
)

_DESIRE_EMOTIONS = frozenset(
    {
        "lust",
        "desire",
        "yearning",
        "tempted",
        "aroused",
        "sultry",
        "passionate",
        "enticed",
        "knot",
    }
)

_FEAR_EMOTIONS = frozenset(
    {
        "fear",
        "anxiety",
        "scared",
        "nervous",
        "dread",
        "insecurity",
        "nervousness",
        "decoherence",
    }
)

_TENDER_EMOTIONS = frozenset(
    {
        "love",
        "affection",
        "bonding",
        "compersion",
        "forgiveness",
        "admiration",
        "faith",
        "worship",
    }
)

_SMUG_EMOTIONS = frozenset(
    {
        "smug",
        "pride",
        "proud",
        "dominant",
        "megalomania",
        "contempt",
        "indifference",
        "satisfaction",
        "satisfied",
    }
)

# File paths relative to web-client/public/res/
_EXPRESSION_FILES = {
    "default": "star-avatar.png",
    "laugh": "star-laugh.png",
    "rage": "star-rage.png",
    "facepalm": "star-facepalm.png",
    "desire": "star-desire.png",
    "fear": "star-fear.png",
    "tender": "star-tender.png",
    "smug": "star-smug.png",
}

# Intense variants use ``star-{bucket}_intense.png`` (assets optional).
for _base in (
    "laugh",
    "rage",
    "facepalm",
    "desire",
    "fear",
    "tender",
    "smug",
):
    _ik = f"{_base}_intense"
    _EXPRESSION_FILES[_ik] = f"star-{_ik}.png"

# Redis key for cached mxc:// URIs
_MXC_MAP_KEY = "star:avatar:mxc_map"

# When multiple buckets tie on count, earlier keys win (deterministic).
_BUCKET_PRIORITY = (
    "rage",
    "fear",
    "facepalm",
    "smug",
    "desire",
    "laugh",
    "tender",
)
_BUCKET_ORDER_INDEX = {k: i for i, k in enumerate(_BUCKET_PRIORITY)}


def _parse_emotion(raw: str) -> tuple[str, str | None]:
    """Return ``(emotion_name_lower, intensity_label_lower_or_none)``.

    Preserves historic behaviour: trailing ``(trace|faint|moderate|intense|overwhelming)``
    is stripped from the name.  When the whole string matches the intensity
    pattern, the label is captured for ``_intense`` expression logic.
    """
    s = raw.strip()
    m = _INTENSITY_CAPTURE_RE.match(s)
    if m:
        return m.group("base").lower().strip(), m.group("intensity").lower()
    base = _INTENSITY_STRIP_RE.sub("", s).lower().strip()
    return base, None


def _emotion_bucket(name_lower: str) -> str | None:
    """Map a single lowercase emotion name to its expression bucket.

    Tests the name against each emotion ``frozenset`` in fixed priority
    order (laugh, rage, facepalm, desire, fear, tender, smug) and returns
    the first bucket whose set contains it. The order here resolves names
    that could conceptually overlap by always matching the earliest set.

    This is a pure lookup with no side effects. It is called by
    :func:`classify_expression`, once while tallying per-bucket scores and
    again while resolving the intensity variant of the winning bucket; the
    input is expected to be a name already normalised by
    :func:`_parse_emotion`.

    Args:
        name_lower: An emotion name, lowercased and stripped of any
            trailing intensity label (e.g. ``"rage"``, ``"amusement"``).

    Returns:
        The bucket key (``"laugh"``, ``"rage"``, ``"facepalm"``,
        ``"desire"``, ``"fear"``, ``"tender"`` or ``"smug"``), or ``None``
        when the name belongs to no known emotion set.
    """
    if name_lower in _LAUGH_EMOTIONS:
        return "laugh"
    if name_lower in _RAGE_EMOTIONS:
        return "rage"
    if name_lower in _FACEPALM_EMOTIONS:
        return "facepalm"
    if name_lower in _DESIRE_EMOTIONS:
        return "desire"
    if name_lower in _FEAR_EMOTIONS:
        return "fear"
    if name_lower in _TENDER_EMOTIONS:
        return "tender"
    if name_lower in _SMUG_EMOTIONS:
        return "smug"
    return None


[docs] def classify_expression(dominant_emotions: list[str]) -> str: """Map a list of dominant emotions to an expression bucket. Returns one of the base keys in ``_EXPRESSION_FILES`` (including ``'{bucket}_intense'`` when the first list-order emotion that matches the winning bucket is labelled ``(intense)`` or ``(overwhelming)``), or ``'default'`` when nothing matches a bucket. Base buckets: ``default``, ``laugh``, ``rage``, ``facepalm``, ``desire``, ``fear``, ``tender``, ``smug``. Intense variants: ``laugh_intense``, ``rage_intense``, ``facepalm_intense``, ``desire_intense``, ``fear_intense``, ``tender_intense``, ``smug_intense``. Handles plain names (``\"RAGE\"``) and intensity-labelled entries (``\"RAGE (intense)\"``) from format_context_injection. """ if not dominant_emotions: return "default" scores = {k: 0 for k in _BUCKET_PRIORITY} for emotion in dominant_emotions: base, _intensity = _parse_emotion(emotion) b = _emotion_bucket(base) if b is not None: scores[b] += 1 max_score = max(scores.values()) if max_score == 0: return "default" winning = max( _BUCKET_PRIORITY, key=lambda k: (scores[k], -_BUCKET_ORDER_INDEX[k]), ) for emotion in dominant_emotions: base, intensity_label = _parse_emotion(emotion) if _emotion_bucket(base) != winning: continue if intensity_label in _INTENSE_LABELS: return f"{winning}_intense" return winning return winning
[docs] async def get_expression_mxc_map( redis_client: Any, matrix_client: Any = None, *, base_dir: str | Path | None = None, ) -> dict[str, str]: """Load or lazily create the expression -> mxc:// URI mapping. Returns the dict that maps each expression bucket key (e.g. ``rage``, ``laugh_intense``) to the ``mxc://`` URI of its avatar image on the Matrix homeserver. The mapping is cached in Redis under ``star:avatar:mxc_map`` so the (slow) upload only happens once; this lets the avatar swap on a hot path stay a pure dictionary lookup. Reads the Redis cache first via ``redis_client.get``; on a miss (or a corrupt JSON value) and when a ``matrix_client`` is supplied, it reads each PNG from ``base_dir`` on disk, uploads the bytes through the nio ``matrix_client.upload`` HTTP call, collects the returned ``content_uri`` values, and writes the assembled map back to Redis with a 30-day TTL. Missing image files and failed uploads are logged and skipped rather than raising. Called by :func:`update_star_avatar` (its only in-repo caller). Args: redis_client: Async Redis client used to read and write the cached ``star:avatar:mxc_map`` value. matrix_client: nio Matrix client used to upload images and obtain their ``mxc://`` URIs. When ``None`` and the cache is empty, the function logs a warning and returns an empty dict (no upload possible). base_dir: Optional directory holding the ``star-*.png`` expression assets. Defaults to the server-side egregore asset directory ``/home/star/large_files/assets/egregores/stargazer``. Returns: A mapping of expression key to ``mxc://`` URI. Empty when nothing is cached and no Matrix client is available, or when no image uploaded. """ # Try Redis cache first cached = await redis_client.get(_MXC_MAP_KEY) if cached: try: return json.loads(cached) except (json.JSONDecodeError, TypeError): pass if matrix_client is None: logger.warning("No Matrix client available for avatar upload") return {} # Upload images and cache mxc URIs if base_dir is None: # Default: server-side egregore assets base_dir = Path("/home/star/large_files/assets/egregores/stargazer") else: base_dir = Path(base_dir) mxc_map: dict[str, str] = {} for expression, filename in _EXPRESSION_FILES.items(): fpath = base_dir / filename if not fpath.exists(): logger.warning("Expression image not found: %s", fpath) continue try: data = fpath.read_bytes() # Detect content type content_type = "image/png" if filename.endswith(".webp"): content_type = "image/webp" elif filename.endswith(".jpg") or filename.endswith(".jpeg"): content_type = "image/jpeg" # nio upload (expects file-like provider, not raw bytes) resp, _keys = await matrix_client.upload( io.BytesIO(data), content_type=content_type, filename=filename, filesize=len(data), ) if hasattr(resp, "content_uri"): mxc_map[expression] = resp.content_uri logger.info( "Uploaded %s -> %s", filename, resp.content_uri, ) else: logger.warning("Upload failed for %s: %s", filename, resp) except Exception: logger.exception("Failed to upload expression image %s", filename) if mxc_map: await redis_client.set( _MXC_MAP_KEY, json.dumps(mxc_map), ex=86400 * 30, # 30 day cache ) return mxc_map
[docs] async def update_star_avatar( redis_client: Any, matrix_client: Any, channel_id: str, dominant_emotions: list[str] | None = None, ) -> None: """Update Star's Matrix avatar based on current emotional state. Picks the expression image that matches Star's dominant emotions and, if it differs from the one currently displayed, pushes it to the Matrix profile. This is the once-per-response side effect that makes the avatar visibly track her mood instead of staying static. When ``dominant_emotions`` is not supplied it reads them from the NCM limbic shard at Redis key ``db12:shard:{channel_id}``. It then runs :func:`classify_expression` to pick a bucket, resolves that bucket to an ``mxc://`` URI via :func:`get_expression_mxc_map` (falling back from an ``_intense`` variant to its base key, then to ``default``), short-circuits when Redis key ``star:avatar:current_expression`` already records that expression, and otherwise calls ``matrix_client.set_avatar`` over the Matrix API and stores the new expression in Redis with a 1-hour TTL. The whole body is wrapped so any failure is logged at debug and swallowed -- a cosmetic avatar update must never break reply delivery. Called by ``message_processor/generate_and_send.py`` just before the reply is sent. Args: redis_client: Async Redis client used to read the limbic shard and the cached expression map, and to read/write the current-expression key. matrix_client: nio Matrix client whose ``set_avatar`` is invoked and which is also used to lazily upload images via :func:`get_expression_mxc_map`. channel_id: Channel identifier used to locate the NCM limbic shard key when ``dominant_emotions`` is not passed in. dominant_emotions: Optional pre-computed list of dominant emotion labels. When ``None``, read from the limbic shard in Redis. Returns: None. Acts only through its Matrix and Redis side effects. """ try: # Get dominant emotions from NCM shard if not provided if dominant_emotions is None: shard_key = f"db12:shard:{channel_id}" shard_raw = await redis_client.get(shard_key) if shard_raw: try: shard = json.loads(shard_raw) dominant_emotions = shard.get("dominant_emotions", []) except (json.JSONDecodeError, TypeError): dominant_emotions = [] else: dominant_emotions = [] expression = classify_expression(dominant_emotions) # Load mxc map mxc_map = await get_expression_mxc_map(redis_client, matrix_client) if not mxc_map: return mxc_uri = mxc_map.get(expression) if not mxc_uri and expression.endswith("_intense"): base_key = expression[: -len("_intense")] mxc_uri = mxc_map.get(base_key) if not mxc_uri: mxc_uri = mxc_map.get("default") if not mxc_uri: return # Check if we already have this avatar set (avoid unnecessary API calls) last_expression_key = "star:avatar:current_expression" last = await redis_client.get(last_expression_key) if last == expression: return # already showing this expression # Set the avatar via Matrix API await matrix_client.set_avatar(mxc_uri) await redis_client.set(last_expression_key, expression, ex=3600) logger.info( "Star avatar updated to '%s' (emotions: %s)", expression, dominant_emotions, ) except Exception: logger.debug( "Failed to update Star avatar expression", exc_info=True, )
[docs] class AsyncDebouncer: """Delays and coalesces rapid update requests, running only the final one. A trailing-edge debouncer: each :meth:`trigger` resets a delay window and cancels the previously pending task, so a burst of calls collapses to a single execution once the requests stop -- useful for throttling something like an avatar refresh that would otherwise fire on every rapid update. Holds the delay and a single in-flight :class:`asyncio.Task` slot (``self._timer``); the actual waiting and execution happen in :meth:`_wait_and_execute`. State lives entirely in memory with no Redis, network, or filesystem interaction. In this repo it is exercised only by ``tests/test_limbic_concurrency.py``; no production call sites were found. """
[docs] def __init__(self, delay_seconds: float = 5.0) -> None: """Initialise the debouncer with a coalescing delay window. Stores the delay and a slot for the single in-flight timer task; no task is scheduled until :meth:`trigger` is first called. Args: delay_seconds: Seconds to wait after the most recent :meth:`trigger` call before the coalesced task runs. Defaults to ``5.0``. """ self.delay = delay_seconds self._timer: Optional[asyncio.Task] = None
[docs] def trigger(self, task_func: Any) -> None: """Schedule ``task_func`` to run after the delay, superseding any pending call. Cancels the previously scheduled timer (if any) and starts a fresh :class:`asyncio.Task` running :meth:`_wait_and_execute`, so that when triggers arrive in quick succession only the final one's coroutine actually executes once the window elapses. Calls :func:`asyncio.create_task` to launch the deferred task and cancels ``self._timer`` to drop the prior pending execution; it must be called from within a running event loop. In the codebase its only observed caller is ``tests/test_limbic_concurrency.py`` (``test_async_debouncer``), which fires three rapid triggers and asserts only the last runs; no production call sites were found. Args: task_func: A zero-argument callable returning an awaitable (e.g. a coroutine function or lambda) to be awaited after the delay. Returns: None. """ # Cancel any pending execution timer if self._timer: self._timer.cancel() self._timer = asyncio.create_task(self._wait_and_execute(task_func))
async def _wait_and_execute(self, task_func: Any) -> None: """Sleep for the debounce delay, then await ``task_func`` unless superseded. Backs the task created in :meth:`trigger`. It waits ``self.delay`` seconds and then awaits the supplied callable. If a newer trigger cancels this task during the sleep, the resulting :class:`asyncio.CancelledError` is swallowed (logged at debug) so the superseded call is silently dropped; any other exception from the task is logged at error level. The ``self._timer`` slot is cleared in the ``finally`` block regardless of outcome. This is invoked indirectly via :func:`asyncio.create_task` in :meth:`trigger`; it has no direct callers. Args: task_func: The zero-argument callable forwarded from :meth:`trigger`, awaited after the delay elapses. Returns: None. """ try: await asyncio.sleep(self.delay) # Execute actual task await task_func() except asyncio.CancelledError: logger.debug("Debounced task execution cancelled due to newer update.") except Exception as e: logger.error( "Error during debounced task execution: %s", str(e), exc_info=True ) finally: self._timer = None