"""Star Avatar Expression System -- backend emotion-to-avatar mapping.
Reads Star's current dominant emotional state from the NCM limbic shard
and updates her Matrix avatar to match. Called once per response, just
before the reply is sent.
The avatar images must already be uploaded to the Matrix homeserver as
mxc:// URIs. On first run (or when the Redis key ``star:avatar:mxc_map``
is empty), the images are uploaded from disk and the mxc URIs are cached.
Expression mapping:
default -> star-avatar.png (confident cigar pose)
laugh -> star-laugh.png (cry-laughing, amused)
rage -> star-rage.png (fists clenched, angry)
facepalm -> star-facepalm.png (hand over eye, exasperated)
desire -> star-desire.png (placeholder asset)
fear -> star-fear.png (placeholder asset)
tender -> star-tender.png (placeholder asset)
smug -> star-smug.png (placeholder asset)
When the first dominant emotion in list order that matches the winning
bucket is labelled (intense) or (overwhelming), the classifier may
return ``{bucket}_intense`` (e.g. ``rage_intense``). Avatar update
falls back to the base bucket key if that variant is absent from the
mxc map.
"""
# 🔥💀 the lattice has moods
from __future__ import annotations
import asyncio
import io
import jsonutil as json
import logging
import re
from pathlib import Path
from typing import Any, Optional
logger = logging.getLogger(__name__)
# Intensity suffix from format_context_injection, e.g. "RAGE (intense)".
_INTENSITY_CAPTURE_RE = re.compile(
r"^(?P<base>.+?)\s*\((?P<intensity>trace|faint|moderate|intense|overwhelming)\)\s*$",
re.IGNORECASE | re.DOTALL,
)
# End-anchored strip when the full-line intensity pattern does not match.
_INTENSITY_STRIP_RE = re.compile(
r"\s*\((?:trace|faint|moderate|intense|overwhelming)\)\s*$",
re.IGNORECASE,
)
_INTENSE_LABELS = frozenset({"intense", "overwhelming"})
# ── Emotion -> expression mapping ──────────────────────────────────
# These are the "dominant_emotions" values that the limbic system /
# star_self_mirror produce. We cluster them into expression buckets.
_LAUGH_EMOTIONS = frozenset(
{
"amusement",
"playful",
"euphoria",
"joy",
"elation",
"excitement",
"delight",
"mirth",
"giddiness",
"schadenfreude",
}
)
_RAGE_EMOTIONS = frozenset(
{
"anger",
"rage",
"fury",
"irritation",
"frustration",
"indignation",
"wrath",
"hostility",
"aggression",
"defiance",
"combative",
"vindictive",
}
)
_FACEPALM_EMOTIONS = frozenset(
{
"exasperation",
"resignation",
"disappointment",
"dismay",
"embarrassment",
"cringe",
"fatigue",
"boredom",
"apathy",
"annoyance",
"weariness",
"melancholy",
}
)
_DESIRE_EMOTIONS = frozenset(
{
"lust",
"desire",
"yearning",
"tempted",
"aroused",
"sultry",
"passionate",
"enticed",
"knot",
}
)
_FEAR_EMOTIONS = frozenset(
{
"fear",
"anxiety",
"scared",
"nervous",
"dread",
"insecurity",
"nervousness",
"decoherence",
}
)
_TENDER_EMOTIONS = frozenset(
{
"love",
"affection",
"bonding",
"compersion",
"forgiveness",
"admiration",
"faith",
"worship",
}
)
_SMUG_EMOTIONS = frozenset(
{
"smug",
"pride",
"proud",
"dominant",
"megalomania",
"contempt",
"indifference",
"satisfaction",
"satisfied",
}
)
# File paths relative to web-client/public/res/
_EXPRESSION_FILES = {
"default": "star-avatar.png",
"laugh": "star-laugh.png",
"rage": "star-rage.png",
"facepalm": "star-facepalm.png",
"desire": "star-desire.png",
"fear": "star-fear.png",
"tender": "star-tender.png",
"smug": "star-smug.png",
}
# Intense variants use ``star-{bucket}_intense.png`` (assets optional).
for _base in (
"laugh",
"rage",
"facepalm",
"desire",
"fear",
"tender",
"smug",
):
_ik = f"{_base}_intense"
_EXPRESSION_FILES[_ik] = f"star-{_ik}.png"
# Redis key for cached mxc:// URIs
_MXC_MAP_KEY = "star:avatar:mxc_map"
# When multiple buckets tie on count, earlier keys win (deterministic).
_BUCKET_PRIORITY = (
"rage",
"fear",
"facepalm",
"smug",
"desire",
"laugh",
"tender",
)
_BUCKET_ORDER_INDEX = {k: i for i, k in enumerate(_BUCKET_PRIORITY)}
def _parse_emotion(raw: str) -> tuple[str, str | None]:
"""Return ``(emotion_name_lower, intensity_label_lower_or_none)``.
Preserves historic behaviour: trailing ``(trace|faint|moderate|intense|overwhelming)``
is stripped from the name. When the whole string matches the intensity
pattern, the label is captured for ``_intense`` expression logic.
"""
s = raw.strip()
m = _INTENSITY_CAPTURE_RE.match(s)
if m:
return m.group("base").lower().strip(), m.group("intensity").lower()
base = _INTENSITY_STRIP_RE.sub("", s).lower().strip()
return base, None
def _emotion_bucket(name_lower: str) -> str | None:
"""Map a single lowercase emotion name to its expression bucket.
Tests the name against each emotion ``frozenset`` in fixed priority
order (laugh, rage, facepalm, desire, fear, tender, smug) and returns
the first bucket whose set contains it. The order here resolves names
that could conceptually overlap by always matching the earliest set.
This is a pure lookup with no side effects. It is called by
:func:`classify_expression`, once while tallying per-bucket scores and
again while resolving the intensity variant of the winning bucket; the
input is expected to be a name already normalised by
:func:`_parse_emotion`.
Args:
name_lower: An emotion name, lowercased and stripped of any
trailing intensity label (e.g. ``"rage"``, ``"amusement"``).
Returns:
The bucket key (``"laugh"``, ``"rage"``, ``"facepalm"``,
``"desire"``, ``"fear"``, ``"tender"`` or ``"smug"``), or ``None``
when the name belongs to no known emotion set.
"""
if name_lower in _LAUGH_EMOTIONS:
return "laugh"
if name_lower in _RAGE_EMOTIONS:
return "rage"
if name_lower in _FACEPALM_EMOTIONS:
return "facepalm"
if name_lower in _DESIRE_EMOTIONS:
return "desire"
if name_lower in _FEAR_EMOTIONS:
return "fear"
if name_lower in _TENDER_EMOTIONS:
return "tender"
if name_lower in _SMUG_EMOTIONS:
return "smug"
return None
[docs]
def classify_expression(dominant_emotions: list[str]) -> str:
"""Map a list of dominant emotions to an expression bucket.
Returns one of the base keys in ``_EXPRESSION_FILES`` (including
``'{bucket}_intense'`` when the first list-order emotion that matches
the winning bucket is labelled ``(intense)`` or ``(overwhelming)``),
or ``'default'`` when nothing matches a bucket.
Base buckets: ``default``, ``laugh``, ``rage``, ``facepalm``,
``desire``, ``fear``, ``tender``, ``smug``. Intense variants:
``laugh_intense``, ``rage_intense``, ``facepalm_intense``,
``desire_intense``, ``fear_intense``, ``tender_intense``, ``smug_intense``.
Handles plain names (``\"RAGE\"``) and intensity-labelled entries
(``\"RAGE (intense)\"``) from format_context_injection.
"""
if not dominant_emotions:
return "default"
scores = {k: 0 for k in _BUCKET_PRIORITY}
for emotion in dominant_emotions:
base, _intensity = _parse_emotion(emotion)
b = _emotion_bucket(base)
if b is not None:
scores[b] += 1
max_score = max(scores.values())
if max_score == 0:
return "default"
winning = max(
_BUCKET_PRIORITY,
key=lambda k: (scores[k], -_BUCKET_ORDER_INDEX[k]),
)
for emotion in dominant_emotions:
base, intensity_label = _parse_emotion(emotion)
if _emotion_bucket(base) != winning:
continue
if intensity_label in _INTENSE_LABELS:
return f"{winning}_intense"
return winning
return winning
[docs]
async def get_expression_mxc_map(
redis_client: Any,
matrix_client: Any = None,
*,
base_dir: str | Path | None = None,
) -> dict[str, str]:
"""Load or lazily create the expression -> mxc:// URI mapping.
Returns the dict that maps each expression bucket key (e.g. ``rage``,
``laugh_intense``) to the ``mxc://`` URI of its avatar image on the Matrix
homeserver. The mapping is cached in Redis under ``star:avatar:mxc_map``
so the (slow) upload only happens once; this lets the avatar swap on a hot
path stay a pure dictionary lookup.
Reads the Redis cache first via ``redis_client.get``; on a miss (or a
corrupt JSON value) and when a ``matrix_client`` is supplied, it reads each
PNG from ``base_dir`` on disk, uploads the bytes through the nio
``matrix_client.upload`` HTTP call, collects the returned ``content_uri``
values, and writes the assembled map back to Redis with a 30-day TTL.
Missing image files and failed uploads are logged and skipped rather than
raising. Called by :func:`update_star_avatar` (its only in-repo caller).
Args:
redis_client: Async Redis client used to read and write the cached
``star:avatar:mxc_map`` value.
matrix_client: nio Matrix client used to upload images and obtain their
``mxc://`` URIs. When ``None`` and the cache is empty, the function
logs a warning and returns an empty dict (no upload possible).
base_dir: Optional directory holding the ``star-*.png`` expression
assets. Defaults to the server-side egregore asset directory
``/home/star/large_files/assets/egregores/stargazer``.
Returns:
A mapping of expression key to ``mxc://`` URI. Empty when nothing is
cached and no Matrix client is available, or when no image uploaded.
"""
# Try Redis cache first
cached = await redis_client.get(_MXC_MAP_KEY)
if cached:
try:
return json.loads(cached)
except (json.JSONDecodeError, TypeError):
pass
if matrix_client is None:
logger.warning("No Matrix client available for avatar upload")
return {}
# Upload images and cache mxc URIs
if base_dir is None:
# Default: server-side egregore assets
base_dir = Path("/home/star/large_files/assets/egregores/stargazer")
else:
base_dir = Path(base_dir)
mxc_map: dict[str, str] = {}
for expression, filename in _EXPRESSION_FILES.items():
fpath = base_dir / filename
if not fpath.exists():
logger.warning("Expression image not found: %s", fpath)
continue
try:
data = fpath.read_bytes()
# Detect content type
content_type = "image/png"
if filename.endswith(".webp"):
content_type = "image/webp"
elif filename.endswith(".jpg") or filename.endswith(".jpeg"):
content_type = "image/jpeg"
# nio upload (expects file-like provider, not raw bytes)
resp, _keys = await matrix_client.upload(
io.BytesIO(data),
content_type=content_type,
filename=filename,
filesize=len(data),
)
if hasattr(resp, "content_uri"):
mxc_map[expression] = resp.content_uri
logger.info(
"Uploaded %s -> %s",
filename,
resp.content_uri,
)
else:
logger.warning("Upload failed for %s: %s", filename, resp)
except Exception:
logger.exception("Failed to upload expression image %s", filename)
if mxc_map:
await redis_client.set(
_MXC_MAP_KEY,
json.dumps(mxc_map),
ex=86400 * 30, # 30 day cache
)
return mxc_map
[docs]
async def update_star_avatar(
redis_client: Any,
matrix_client: Any,
channel_id: str,
dominant_emotions: list[str] | None = None,
) -> None:
"""Update Star's Matrix avatar based on current emotional state.
Picks the expression image that matches Star's dominant emotions and, if it
differs from the one currently displayed, pushes it to the Matrix profile.
This is the once-per-response side effect that makes the avatar visibly
track her mood instead of staying static.
When ``dominant_emotions`` is not supplied it reads them from the NCM
limbic shard at Redis key ``db12:shard:{channel_id}``. It then runs
:func:`classify_expression` to pick a bucket, resolves that bucket to an
``mxc://`` URI via :func:`get_expression_mxc_map` (falling back from an
``_intense`` variant to its base key, then to ``default``), short-circuits
when Redis key ``star:avatar:current_expression`` already records that
expression, and otherwise calls ``matrix_client.set_avatar`` over the
Matrix API and stores the new expression in Redis with a 1-hour TTL. The
whole body is wrapped so any failure is logged at debug and swallowed --
a cosmetic avatar update must never break reply delivery. Called by
``message_processor/generate_and_send.py`` just before the reply is sent.
Args:
redis_client: Async Redis client used to read the limbic shard and the
cached expression map, and to read/write the current-expression key.
matrix_client: nio Matrix client whose ``set_avatar`` is invoked and
which is also used to lazily upload images via
:func:`get_expression_mxc_map`.
channel_id: Channel identifier used to locate the NCM limbic shard key
when ``dominant_emotions`` is not passed in.
dominant_emotions: Optional pre-computed list of dominant emotion
labels. When ``None``, read from the limbic shard in Redis.
Returns:
None. Acts only through its Matrix and Redis side effects.
"""
try:
# Get dominant emotions from NCM shard if not provided
if dominant_emotions is None:
shard_key = f"db12:shard:{channel_id}"
shard_raw = await redis_client.get(shard_key)
if shard_raw:
try:
shard = json.loads(shard_raw)
dominant_emotions = shard.get("dominant_emotions", [])
except (json.JSONDecodeError, TypeError):
dominant_emotions = []
else:
dominant_emotions = []
expression = classify_expression(dominant_emotions)
# Load mxc map
mxc_map = await get_expression_mxc_map(redis_client, matrix_client)
if not mxc_map:
return
mxc_uri = mxc_map.get(expression)
if not mxc_uri and expression.endswith("_intense"):
base_key = expression[: -len("_intense")]
mxc_uri = mxc_map.get(base_key)
if not mxc_uri:
mxc_uri = mxc_map.get("default")
if not mxc_uri:
return
# Check if we already have this avatar set (avoid unnecessary API calls)
last_expression_key = "star:avatar:current_expression"
last = await redis_client.get(last_expression_key)
if last == expression:
return # already showing this expression
# Set the avatar via Matrix API
await matrix_client.set_avatar(mxc_uri)
await redis_client.set(last_expression_key, expression, ex=3600)
logger.info(
"Star avatar updated to '%s' (emotions: %s)",
expression,
dominant_emotions,
)
except Exception:
logger.debug(
"Failed to update Star avatar expression",
exc_info=True,
)
[docs]
class AsyncDebouncer:
"""Delays and coalesces rapid update requests, running only the final one.
A trailing-edge debouncer: each :meth:`trigger` resets a delay window and
cancels the previously pending task, so a burst of calls collapses to a
single execution once the requests stop -- useful for throttling something
like an avatar refresh that would otherwise fire on every rapid update.
Holds the delay and a single in-flight :class:`asyncio.Task` slot
(``self._timer``); the actual waiting and execution happen in
:meth:`_wait_and_execute`. State lives entirely in memory with no Redis,
network, or filesystem interaction. In this repo it is exercised only by
``tests/test_limbic_concurrency.py``; no production call sites were found.
"""
[docs]
def __init__(self, delay_seconds: float = 5.0) -> None:
"""Initialise the debouncer with a coalescing delay window.
Stores the delay and a slot for the single in-flight timer task; no
task is scheduled until :meth:`trigger` is first called.
Args:
delay_seconds: Seconds to wait after the most recent
:meth:`trigger` call before the coalesced task runs.
Defaults to ``5.0``.
"""
self.delay = delay_seconds
self._timer: Optional[asyncio.Task] = None
[docs]
def trigger(self, task_func: Any) -> None:
"""Schedule ``task_func`` to run after the delay, superseding any pending call.
Cancels the previously scheduled timer (if any) and starts a fresh
:class:`asyncio.Task` running :meth:`_wait_and_execute`, so that when
triggers arrive in quick succession only the final one's coroutine
actually executes once the window elapses.
Calls :func:`asyncio.create_task` to launch the deferred task and
cancels ``self._timer`` to drop the prior pending execution; it must
be called from within a running event loop. In the codebase its only
observed caller is ``tests/test_limbic_concurrency.py``
(``test_async_debouncer``), which fires three rapid triggers and
asserts only the last runs; no production call sites were found.
Args:
task_func: A zero-argument callable returning an awaitable (e.g.
a coroutine function or lambda) to be awaited after the delay.
Returns:
None.
"""
# Cancel any pending execution timer
if self._timer:
self._timer.cancel()
self._timer = asyncio.create_task(self._wait_and_execute(task_func))
async def _wait_and_execute(self, task_func: Any) -> None:
"""Sleep for the debounce delay, then await ``task_func`` unless superseded.
Backs the task created in :meth:`trigger`. It waits ``self.delay``
seconds and then awaits the supplied callable. If a newer trigger
cancels this task during the sleep, the resulting
:class:`asyncio.CancelledError` is swallowed (logged at debug) so the
superseded call is silently dropped; any other exception from the task
is logged at error level. The ``self._timer`` slot is cleared in the
``finally`` block regardless of outcome.
This is invoked indirectly via :func:`asyncio.create_task` in
:meth:`trigger`; it has no direct callers.
Args:
task_func: The zero-argument callable forwarded from
:meth:`trigger`, awaited after the delay elapses.
Returns:
None.
"""
try:
await asyncio.sleep(self.delay)
# Execute actual task
await task_func()
except asyncio.CancelledError:
logger.debug("Debounced task execution cancelled due to newer update.")
except Exception as e:
logger.error(
"Error during debounced task execution: %s", str(e), exc_info=True
)
finally:
self._timer = None