Source code for conversation

"""Per-room conversation history manager backed by Redis.

Defines :class:`ConversationManager`, which keeps an in-memory sliding window of
user/assistant turns per ``room_id`` and mirrors it to Redis lists (under
``sg:conversation:{room_id}`` with a companion ``:ver`` version counter) so that
the separate inference, agents, and consolidation workers all converge on the
same history. It renders the dynamic system prompt through a
:class:`~prompt_renderer.PromptRenderer`, guards its maps with a
:class:`threading.RLock`, and uses an optimistic compare-and-set Lua script plus
a drainable set of background persist tasks to keep concurrent writers from
clobbering each other.
"""

from __future__ import annotations

import asyncio
import json
import copy
import logging
import re
import threading
import time
from typing import Any, Callable

from prompt_renderer import PromptRenderer

# Matches the ``[Message ID: <id>]`` tag every user/assistant turn carries, used
# to reconcile a Redis reload against locally-appended turns it may not yet see.
_MSG_ID_RE = re.compile(r"\[Message ID:\s*([^\]]+)\]")


def _entry_message_id(entry: dict[str, Any]) -> str | None:
    """Return the ``[Message ID: …]`` of a history entry, or ``None``.

    Handles both plain-string content and multimodal lists (the marker lives in
    the first ``text`` part).
    """
    content = entry.get("content")
    if isinstance(content, str):
        m = _MSG_ID_RE.search(content)
        return m.group(1).strip() if m else None
    if isinstance(content, list):
        for part in content:
            if isinstance(part, dict) and part.get("type") == "text":
                m = _MSG_ID_RE.search(part.get("text", ""))
                if m:
                    return m.group(1).strip()
    return None


def _merge_local_tail(
    loaded: list[dict[str, Any]], current: list[dict[str, Any]]
) -> list[dict[str, Any]]:
    """Reconcile a freshly-loaded Redis history with in-memory local appends.

    A reload reads a point-in-time ``LRANGE`` snapshot; a local ``append_async``
    can land *after* that snapshot but *before* the overwrite, so the just-added
    turn (e.g. an image) is absent from *loaded* yet present at the tail of
    *current*. Blindly replacing would evict it. This preserves the **contiguous
    tail** of *current* whose message-ids are not in *loaded* — exactly the
    recent local appends — and stops at the first entry already in *loaded* (the
    common point), so it never resurrects older/edited/deleted middle turns or
    duplicates a turn that the snapshot already contains.
    """
    if not current:
        return loaded
    loaded_ids = {mid for e in loaded if (mid := _entry_message_id(e))}
    tail_extra: list[dict[str, Any]] = []
    for entry in reversed(current):
        mid = _entry_message_id(entry)
        if mid and mid not in loaded_ids:
            tail_extra.append(entry)
        else:
            break  # hit the common point (or an id-less entry): stop.
    if not tail_extra:
        return loaded
    tail_extra.reverse()
    return loaded + tail_extra

logger = logging.getLogger(__name__)


[docs] class ConversationManager: """Maintains per-room conversation histories. Each room has its own message list. A :class:`PromptRenderer` produces the system prompt dynamically from a Jinja2 template so that it can include room-specific and tool-specific context. A sliding-window strategy keeps at most *max_history* user/assistant messages per room to avoid unbounded memory growth. **Concurrency:** The per-channel :class:`~message_queue.MessageQueue` normally serializes inference for a given ``(platform, channel_id)``, so one writer per ``room_id`` holds in the common case. A :class:`threading.RLock` still protects ``_histories`` and related maps when multiple coroutines touch the same room (e.g. tools) or when different channels are updated concurrently. """
[docs] def __init__( self, prompt_renderer: PromptRenderer, max_history: int = 100, redis: Any | None = None, ) -> None: """Initialize the instance. Args: prompt_renderer (PromptRenderer): The prompt renderer value. max_history (int): The max history value. redis (Any | None): Redis client. """ self.prompt_renderer = prompt_renderer self.max_history = max_history self._redis = redis self._lock = threading.RLock() # room_id -> list of {"role": ..., "content": ...} # content may be a plain string or a list of multimodal # content parts (images, audio, video, files). self._histories: dict[str, list[dict[str, Any]]] = {} # Per-channel overrides for max_history (set via tool). self._channel_overrides: dict[str, int] = {} # Channels that need re-backfilling after a limit increase. self._rebackfill_requested: set[str] = set() # Last-activity timestamp per room for stale-channel GC. self._last_activity: dict[str, float] = {} # Redis sequence counters mirrored in memory after hydrate. self._redis_versions: dict[str, int] = {} # In-flight fire-and-forget persist tasks per room (the sync # append/edit path). Tracked so a reload can drain them before # overwriting in-memory history, and so the tasks aren't garbage # collected mid-flight (``loop.create_task`` keeps no implicit ref). self._pending_persists: dict[str, set[asyncio.Task[Any]]] = {}
def _conversation_key(self, room_id: str) -> str: """Build the Redis list key holding a room's persisted message history. Returns the canonical ``sg:conversation:{room_id}`` key under which a room's serialized messages are stored as a Redis list. This is the key targeted by the ``RPUSH``/``LTRIM`` writes in :meth:`_persist_append`, the atomic ``DEL``+``RPUSH`` rebuild in :meth:`_persist_full_history` (via its Lua script), and the ``LRANGE`` read in :meth:`_load_from_redis` that hydrates the in-memory cache. This is a pure string formatter: it performs no Redis or other I/O and only derives the key name. It is called by :meth:`_persist_append`, :meth:`_load_from_redis`, and :meth:`_persist_full_history`. Args: room_id (str): The room/channel identifier (typically ``platform:channel_id``) whose history list is being addressed. Returns: str: The Redis list key ``sg:conversation:{room_id}``. """ return f"sg:conversation:{room_id}" def _conversation_version_key(self, room_id: str) -> str: """Build the Redis key holding a room's monotonic history version counter. Returns ``sg:conversation:{room_id}:ver``, the integer counter that is ``INCR``-ed on every persisted mutation so concurrent readers can detect that another writer changed the history. :meth:`_persist_append` and :meth:`_persist_full_history` increment it (the latter atomically inside its Lua script alongside the list rebuild), while :meth:`_load_from_redis` and the append fast-path read it via ``GET`` to mirror the remote version into ``self._redis_versions``. This is a pure string formatter performing no I/O. It is called by :meth:`_persist_append`, :meth:`_load_from_redis`, :meth:`_persist_full_history`, and the version check near the top of the append path. Args: room_id (str): The room/channel identifier whose version counter is being addressed. Returns: str: The Redis version key ``sg:conversation:{room_id}:ver``. """ return f"sg:conversation:{room_id}:ver" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def append( self, room_id: str, role: str, content: str | list[dict[str, Any]], ) -> None: """Add a message to *room_id*'s history and trim if needed. *content* may be a plain string or a list of OpenRouter multimodal content parts. The Redis persist is fired as a tracked background task (drainable before a reload); callers that need the persist to be durable before they proceed should use :meth:`append_async`, which awaits it. """ msg = self._append_locked(room_id, role, content) if self._redis is not None: self._schedule_persist(room_id, self._persist_append(room_id, msg))
[docs] def last_history_role(self, room_id: str) -> str | None: """Return the role of the last stored message for *room_id*. History entries are ``user`` or ``assistant`` turns (plus optional system prompt only in :meth:`get_messages`). Returns ``None`` when the room has no stored history. """ with self._lock: history = self._histories.get(room_id) if not history: return None return history[-1].get("role")
[docs] def get_messages( self, room_id: str, room_context: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Return the full message list for *room_id*, including the system prompt. *room_context* is forwarded to the :class:`PromptRenderer` so the Jinja2 template can reference room-level variables such as ``room_name``, ``room_id``, ``sender``, etc. """ messages: list[dict[str, Any]] = [] system_prompt = self.prompt_renderer.render(room_context) if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt}) with self._lock: # Deep-copy so callers (e.g. prompt injection) may mutate # message dicts without mutating stored sliding history. room_tail = copy.deepcopy(list(self._histories.get(room_id, []))) messages.extend(room_tail) return messages
[docs] async def ensure_fresh_from_redis(self, room_id: str) -> None: """Re-hydrate in-memory history when another worker has advanced Redis. Cheaply detects cross-worker divergence by comparing the locally mirrored version against the remote counter, and only does the heavier full reload when this process is genuinely behind. Because the five microservices share one Redis-backed history, an append made by (say) the agents worker must become visible to the inference worker before it next renders the prompt; this is the synchronisation point that makes that happen. First drains any in-flight fire-and-forget persists for this room (so our own un-persisted writes are reflected in ``self._redis_versions`` and a reload cannot drop a just-appended turn), then reads ``sg:conversation:{room_id}:ver`` via ``GET`` and, if the remote version exceeds the local one, calls :meth:`_load_from_redis` to overwrite the in-memory cache. A failed version ``GET`` is logged at debug and treated as "no refresh needed". No-ops when Redis is not configured. Called by :meth:`get_messages_async` (the cache-warm path) and by the message processor before assembling context (``message_processor/processor.py`` around line 2000). Args: room_id (str): The room/channel whose history may need re-hydrating. """ if self._redis is None: return # Land any in-flight fire-and-forget append/edit persists for this room # BEFORE reading the version, so (a) our own un-persisted writes are in # Redis and reflected in ``_redis_versions`` (no false "we're behind"), # and (b) a reload below cannot overwrite memory with a Redis list that # is missing a just-appended turn (e.g. an image). ``local_ver`` is read # after the drain for the same reason. await self._drain_pending_persists(room_id) try: remote_ver_raw = await self._redis.get(self._conversation_version_key(room_id)) remote_ver = int(remote_ver_raw or 0) except Exception: logger.debug( "Failed to read conversation version for %s", room_id, exc_info=True, ) return with self._lock: local_ver = self._redis_versions.get(room_id, 0) if remote_ver > local_ver: await self._load_from_redis(room_id)
[docs] async def get_messages_async( self, room_id: str, room_context: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Return the full message list for *room_id*, including the system prompt. Same as :meth:`get_messages` but runs Jinja2 rendering and the lock-held ``deepcopy`` in a thread pool so the event loop is not blocked by CPU-bound work. """ with self._lock: in_cache = room_id in self._histories if not in_cache and self._redis is not None: await self._load_from_redis(room_id) elif self._redis is not None: await self.ensure_fresh_from_redis(room_id) messages: list[dict[str, Any]] = [] system_prompt = await asyncio.to_thread( self.prompt_renderer.render, room_context, ) if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt}) room_tail = await asyncio.to_thread(self._get_history_snapshot, room_id) messages.extend(room_tail) return messages
[docs] def update_message( self, room_id: str, message_id: str, new_content: str | list[dict[str, Any]], ) -> bool: """Replace the content of an existing history entry by message ID. Scans backward through *room_id*'s history for an entry whose text contains ``[Message ID: {message_id}]`` and replaces its content. Returns ``True`` if a match was found. """ found = self._update_message_locked(room_id, message_id, new_content) if found and self._redis is not None: self._schedule_persist( room_id, self._persist_full_history( room_id, reapply=lambda: self._update_message_locked( room_id, message_id, new_content ), ), ) return found
def _update_message_locked( self, room_id: str, message_id: str, new_content: str | list[dict[str, Any]], ) -> bool: """Mutation-only core of :meth:`update_message` (no persistence). Shared by the sync public and :meth:`update_message_async` so the in-memory edit and the Redis persist are decoupled and never both fire. """ found = False with self._lock: history = self._histories.get(room_id) if history: marker = f"[Message ID: {message_id}]" for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content: entry["content"] = new_content found = True break elif isinstance(content, list): for part in content: if part.get("type") == "text" and marker in part.get( "text", "" ): # Preserve media parts (image_url / file / etc.) # on a text-only edit. A Discord MESSAGE_UPDATE — # which fires right after an image post for embed/ # proxy finalization — carries only text; # replacing the whole entry wholesale here was # wiping the image, leaving the model blind on # that turn (it would hallucinate a description). # Attachments are immutable on edit, so keep them # and swap only the text. media_parts = [ p for p in content if isinstance(p, dict) and p.get("type") != "text" ] if media_parts and isinstance(new_content, str): entry["content"] = [ {"type": "text", "text": new_content}, *media_parts, ] else: entry["content"] = new_content found = True break if found: break return found
[docs] def mark_deleted( self, room_id: str, message_id: str, deleted_at_iso: str, ) -> bool: """Inject a ``[deleted at TIMESTAMP]`` tag into an existing entry. The original content is preserved so the bot retains full context. Returns ``True`` if the message was found. """ found = self._mark_deleted_locked(room_id, message_id, deleted_at_iso) if found and self._redis is not None: self._schedule_persist( room_id, self._persist_full_history( room_id, reapply=lambda: self._mark_deleted_locked( room_id, message_id, deleted_at_iso ), ), ) return found
def _mark_deleted_locked( self, room_id: str, message_id: str, deleted_at_iso: str, ) -> bool: """Mutation-only core of :meth:`mark_deleted`: tag the entry, no persistence. Holds ``self._lock`` and scans the room's history backward for the entry carrying ``[Message ID: {message_id}]`` (handling both plain-string and multimodal list content), splicing a ``[deleted at {deleted_at_iso}]`` marker in after the message-ID tag exactly once and skipping entries already tagged. Splitting this from the persist lets the sync and async paths each drive Redis their own way and lets it serve as the *reapply* callback when :meth:`_persist_full_history` retries on a CAS conflict. Touches only in-memory state; performs no Redis or other I/O. Called by :meth:`mark_deleted` and :meth:`mark_deleted_async`, and passed as the ``reapply`` lambda to :meth:`_persist_full_history` from both. Args: room_id (str): The room whose history is searched. message_id (str): The platform message id to locate via its marker. deleted_at_iso (str): ISO timestamp inserted into the deleted tag. Returns: bool: ``True`` if a matching entry was found and tagged, else ``False``. """ found = False with self._lock: history = self._histories.get(room_id) if history: marker = f"[Message ID: {message_id}]" tag = f" [deleted at {deleted_at_iso}]" for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content and tag not in content: entry["content"] = content.replace( marker, marker + tag, 1, ) found = True break elif isinstance(content, list): for part in content: text = part.get("text", "") if ( part.get("type") == "text" and marker in text and tag not in text ): part["text"] = text.replace( marker, marker + tag, 1, ) found = True break if found: break return found
[docs] def patch_reactions( self, room_id: str, message_id: str, reactions_str: str, ) -> bool: """Update the ``[Reactions: ...]`` tag on a history entry. Strips any existing reaction tag and, if *reactions_str* is non-empty, inserts a new one after the message-ID marker. Returns ``True`` if the entry was found. """ found = self._patch_reactions_locked(room_id, message_id, reactions_str) if found and self._redis is not None: self._schedule_persist( room_id, self._persist_full_history( room_id, reapply=lambda: self._patch_reactions_locked( room_id, message_id, reactions_str ), ), ) return found
def _patch_reactions_locked( self, room_id: str, message_id: str, reactions_str: str, ) -> bool: """Mutation-only core of :meth:`patch_reactions`: rewrite the tag, no persistence. Holds ``self._lock`` and scans the room's history backward for the entry carrying ``[Message ID: {message_id}]`` (in plain-string or multimodal list content), strips any existing ``[Reactions: ...]`` tag with a regex, and — when *reactions_str* is non-empty — reinserts a fresh ``[Reactions: {reactions_str}]`` tag right after the message-ID marker. Kept separate from the persist so the sync and async callers each handle Redis themselves and so it can serve as the *reapply* callback on a :meth:`_persist_full_history` retry. Touches only in-memory state; performs no Redis or other I/O. Called by :meth:`patch_reactions` and :meth:`patch_reactions_async`, and passed as the ``reapply`` lambda to :meth:`_persist_full_history` from both. Args: room_id (str): The room whose history is searched. message_id (str): The platform message id to locate via its marker. reactions_str (str): The new reaction summary; empty clears the tag. Returns: bool: ``True`` if a matching entry was found and updated, else ``False``. """ import re reactions_pattern = re.compile(r" \[Reactions: [^\]]*\]") marker = f"[Message ID: {message_id}]" found = False with self._lock: history = self._histories.get(room_id) if history: for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content: content = reactions_pattern.sub("", content) if reactions_str: content = content.replace( marker, f"{marker} [Reactions: {reactions_str}]", 1, ) entry["content"] = content found = True break elif isinstance(content, list): for part in content: text = part.get("text", "") if part.get("type") == "text" and marker in text: text = reactions_pattern.sub("", text) if reactions_str: text = text.replace( marker, f"{marker} [Reactions: {reactions_str}]", 1, ) part["text"] = text found = True break if found: break return found # ------------------------------------------------------------------ # Async wrappers (offload lock-held work from the event loop) # ------------------------------------------------------------------
[docs] async def append_async( self, room_id: str, role: str, content: str | list[dict[str, Any]], ) -> None: """Async version of :meth:`append`. Runs the in-memory mutation in a thread, then **awaits** the Redis persist on the event loop. Awaiting is essential: the previous ``asyncio.to_thread(self.append, ...)`` ran ``append`` in a worker thread where ``asyncio.get_running_loop()`` raises, so the persist was silently skipped and the turn (e.g. a user's image) never reached Redis — a concurrent reload would then drop it from in-memory history. The persist is **registered as a tracked pending task** (exactly like the sync :meth:`append`) *before* being awaited, so a concurrent reader's :meth:`_drain_pending_persists` waits for this RPUSH to commit before it reloads. Awaiting an *untracked* persist defeated that guard: a reload landing during the persist window would read a Redis list still missing this just-appended turn and overwrite it out of memory. The turn survived in Redis, but a later full-history rewrite (an edit or a reaction) then persisted the turn-less snapshot and lost it for good — and since ``_preprocess_and_record`` records the user's image turn through this path, it was the one silently losing images. """ msg = await asyncio.to_thread(self._append_locked, room_id, role, content) if self._redis is not None: persist = asyncio.ensure_future(self._persist_append(room_id, msg)) with self._lock: self._pending_persists.setdefault(room_id, set()).add(persist) persist.add_done_callback( lambda t, rid=room_id: self._discard_persist(rid, t) ) await persist
[docs] async def update_message_async( self, room_id: str, message_id: str, new_content: str | list[dict[str, Any]], ) -> bool: """Async version of :meth:`update_message`. Mutates in a thread, then awaits the full-history persist on the loop (the sync path's in-thread persist scheduling would otherwise be silently skipped — there is no running loop in the worker thread). """ found = await asyncio.to_thread( self._update_message_locked, room_id, message_id, new_content, ) if found and self._redis is not None: await self._persist_full_history( room_id, reapply=lambda: self._update_message_locked( room_id, message_id, new_content ), ) return found
[docs] async def mark_deleted_async( self, room_id: str, message_id: str, deleted_at_iso: str, ) -> bool: """Async version of :meth:`mark_deleted` — mutate in a thread, then await the full-history persist on the loop.""" found = await asyncio.to_thread( self._mark_deleted_locked, room_id, message_id, deleted_at_iso, ) if found and self._redis is not None: await self._persist_full_history( room_id, reapply=lambda: self._mark_deleted_locked( room_id, message_id, deleted_at_iso ), ) return found
[docs] async def patch_reactions_async( self, room_id: str, message_id: str, reactions_str: str, ) -> bool: """Async version of :meth:`patch_reactions` — mutate in a thread, then await the full-history persist on the loop.""" found = await asyncio.to_thread( self._patch_reactions_locked, room_id, message_id, reactions_str, ) if found and self._redis is not None: await self._persist_full_history( room_id, reapply=lambda: self._patch_reactions_locked( room_id, message_id, reactions_str ), ) return found
[docs] def reap_stale(self, max_idle_seconds: float = 3600.0) -> int: """Remove histories for channels idle longer than *max_idle_seconds*. Returns the number of channels reaped. """ cutoff = time.monotonic() - max_idle_seconds with self._lock: stale = [rid for rid, ts in self._last_activity.items() if ts < cutoff] for rid in stale: self._histories.pop(rid, None) self._channel_overrides.pop(rid, None) self._last_activity.pop(rid, None) if self._redis is not None and stale: try: loop = asyncio.get_running_loop() for rid in stale: loop.create_task(self._redis.delete(f"sg:conversation:{rid}")) except (RuntimeError, Exception) as exc: logger.warning("Failed to delete Redis keys on reap_stale: %s", exc) return len(stale)
[docs] def clear(self, room_id: str) -> None: """Wipe a room's conversation history from memory and Redis. Drops the in-memory history for *room_id* under the lock, then (if Redis is configured) fires a fire-and-forget ``DEL`` of the backing ``sg:conversation:{room_id}`` list via ``loop.create_task``. When there is no running event loop the Redis delete is skipped and a warning is logged, leaving only the in-memory wipe. Note this does not reset the ``:ver`` version counter, so a later writer's compare-and-set still sees a monotonic version. Called by the message processor's reset/clear paths (``message_processor/processor.py`` around lines 869, 903, and 2034). Args: room_id (str): The room/channel whose history is being cleared. """ with self._lock: self._histories.pop(room_id, None) if self._redis is not None: try: loop = asyncio.get_running_loop() loop.create_task(self._redis.delete(f"sg:conversation:{room_id}")) except (RuntimeError, Exception) as exc: logger.warning("Failed to delete Redis key on clear: %s", exc)
[docs] def is_rebackfill_requested(self, room_id: str) -> bool: """Report whether *room_id* is flagged to re-backfill more history. Returns the pending state of the re-backfill marker that :meth:`set_channel_limit` raises when a channel's history limit is increased, signalling that the next turn should pull additional older messages from the platform cache to fill the now-larger window. A simple lock-held membership test with no side effects. Called by the message processor when deciding whether to backfill (``message_processor/processor.py`` around line 2029). Args: room_id (str): The room/channel to check. Returns: bool: ``True`` if a re-backfill is pending for the room. """ with self._lock: return room_id in self._rebackfill_requested
[docs] def discard_rebackfill_request(self, room_id: str) -> None: """Clear the pending re-backfill marker for *room_id*. Removes the room from the re-backfill set under the lock once the backfill it requested has been carried out, so the work is not repeated on the next turn. Idempotent — discarding an absent room is a no-op. Called after a backfill completes by the message processor (``message_processor/processor.py`` around line 2033) and the history backfill helper (``message_processor/history_backfill.py`` around line 104). Args: room_id (str): The room/channel whose marker is being cleared. """ with self._lock: self._rebackfill_requested.discard(room_id)
# ------------------------------------------------------------------ # Per-channel limit overrides # ------------------------------------------------------------------ #: Absolute bounds for per-channel overrides. MIN_CHANNEL_LIMIT = 50 MAX_CHANNEL_LIMIT = 1000
[docs] def set_channel_limit(self, room_id: str, limit: int) -> int: """Set a per-channel override for *max_history*. The value is clamped to ``[MIN_CHANNEL_LIMIT, MAX_CHANNEL_LIMIT]``. Returns the clamped value actually stored. """ with self._lock: clamped = max(self.MIN_CHANNEL_LIMIT, min(limit, self.MAX_CHANNEL_LIMIT)) current_effective = self._channel_overrides.get(room_id, self.max_history) self._channel_overrides[room_id] = clamped # If the limit was raised, request a re-backfill so more # messages are loaded from the cache on the next turn. if clamped > current_effective: self._rebackfill_requested.add(room_id) # Re-trim with the new limit in case it shrank. self._trim(room_id) return clamped
[docs] def get_channel_limit(self, room_id: str) -> int: """Return the effective message limit for *room_id*. Uses the per-channel override if set, otherwise the global default. """ with self._lock: return self._channel_overrides.get(room_id, self.max_history)
[docs] def get_history_message_count(self, room_id: str) -> int: """Return how many user/assistant turns are currently stored for *room_id*. Reports the length of the in-memory sliding window (excluding the system prompt, which is rendered fresh and never stored), so the value reflects what is presently cached rather than the full Redis-persisted history. A lock-held read with no side effects; returns ``0`` for an unknown or empty room. Called by prompt-context assembly to size/decide backfill (``prompt_context.py`` around line 1985). Args: room_id (str): The room/channel whose stored turn count is wanted. Returns: int: The number of cached user/assistant messages for the room. """ with self._lock: return len(self._histories.get(room_id, []))
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ #: Max passes for :meth:`_drain_pending_persists` — re-checks after each #: await so a persist registered mid-drain is still waited on, bounded so a #: continuously-writing room cannot livelock the drain. _MAX_DRAIN_PASSES = 3 def _append_locked( self, room_id: str, role: str, content: str | list[dict[str, Any]], ) -> dict[str, Any]: """Mutation-only core of :meth:`append`: append + trim under the lock. Returns the appended message dict so the caller can persist it. Shared by the sync :meth:`append` (which fires a tracked background persist) and :meth:`append_async` (which awaits the persist on the loop). """ with self._lock: self._last_activity[room_id] = time.monotonic() history = self._histories.setdefault(room_id, []) msg = {"role": role, "content": content} history.append(msg) self._trim(room_id) return msg def _schedule_persist(self, room_id: str, coro: Any) -> None: """Fire *coro* as a tracked background persist task for *room_id*. The task is retained in ``_pending_persists`` (preventing GC and enabling :meth:`_drain_pending_persists`) and removed when it finishes. If there is no running event loop (e.g. called from a worker thread), the coroutine is closed cleanly instead — the async wrappers handle persistence themselves in that case. """ try: loop = asyncio.get_running_loop() except RuntimeError: coro.close() return task = loop.create_task(coro) with self._lock: self._pending_persists.setdefault(room_id, set()).add(task) task.add_done_callback( lambda t, rid=room_id: self._discard_persist(rid, t) ) def _discard_persist(self, room_id: str, task: asyncio.Task[Any]) -> None: """Done-callback: drop a finished persist task from the pending set. Deliberately does not inspect ``task.result()`` — the persist coroutines swallow their own exceptions, and re-raising here would only produce spurious unhandled-exception logs. """ with self._lock: tasks = self._pending_persists.get(room_id) if tasks is not None: tasks.discard(task) if not tasks: self._pending_persists.pop(room_id, None) async def _drain_pending_persists(self, room_id: str) -> None: """Await any in-flight fire-and-forget persists for *room_id*. Called before a reload so a sync-``append`` whose persist is still in flight lands in Redis (and updates ``_redis_versions``) before ``_load_from_redis`` overwrites memory. Re-checks the pending set after each wait: a new persist can be registered while we await, so a single snapshot is not enough to guarantee quiescence. Bounded by ``_MAX_DRAIN_PASSES`` so a continuously-writing room cannot livelock. The current task is excluded from the wait: a sync-path :meth:`_persist_full_history` runs *as* a tracked pending task and, on a CAS conflict, calls :meth:`_load_from_redis` (which drains) — without this exclusion the task would await itself and deadlock. """ try: current = asyncio.current_task() except RuntimeError: current = None for _ in range(self._MAX_DRAIN_PASSES): with self._lock: tasks = [ t for t in self._pending_persists.get(room_id, ()) if not t.done() and t is not current ] if not tasks: return await asyncio.gather(*tasks, return_exceptions=True) def _get_history_snapshot(self, room_id: str) -> list[dict[str, Any]]: """Return a deep copy of *room_id*'s history under the lock. Designed to be called via ``asyncio.to_thread`` so the lock acquisition and ``deepcopy`` never block the event loop. """ with self._lock: return copy.deepcopy(list(self._histories.get(room_id, []))) def _trim(self, room_id: str) -> None: """Keep only the last N messages for a room. Uses the per-channel override if set, otherwise falls back to the global ``max_history``. """ with self._lock: history = self._histories.get(room_id) limit = self._channel_overrides.get(room_id, self.max_history) if history and len(history) > limit: self._histories[room_id] = history[-limit:] async def _persist_append(self, room_id: str, msg: dict[str, Any]) -> None: """Persist a single appended message to the room's Redis list. Serializes *msg* to JSON and, in one Redis pipeline, ``RPUSH``-es it onto ``sg:conversation:{room_id}``, ``LTRIM``-s the list to the channel's effective limit, and ``INCR``-s the ``:ver`` version counter. The local mirror in ``self._redis_versions`` is only advanced when the returned version is exactly one past the snapshot taken under the lock; if another writer interleaved (the counter jumped further), the local version is left deliberately stale-low so the next :meth:`ensure_fresh_from_redis` reloads and self-heals — our appended row still survives in the list. Any failure is swallowed with a warning so a Redis hiccup never breaks the in-memory turn that already happened. Runs against Redis (pipeline ``RPUSH``/``LTRIM``/``INCR``). Called by the sync :meth:`append` (scheduled through :meth:`_schedule_persist` as a tracked background task) and awaited directly by :meth:`append_async`. Args: room_id (str): The room whose list and version counter are updated. msg (dict[str, Any]): The message dict (``role``/``content``) to push. """ key = self._conversation_key(room_id) ver_key = self._conversation_version_key(room_id) try: serialized = json.dumps(msg, default=str) limit = self.get_channel_limit(room_id) with self._lock: expected_prev = self._redis_versions.get(room_id, 0) pipe = self._redis.pipeline() pipe.rpush(key, serialized) pipe.ltrim(key, -limit, -1) pipe.incr(ver_key) results = await pipe.execute() new_ver = int(results[-1]) with self._lock: # Only adopt the new version when no other writer interleaved # (new_ver == expected_prev + 1). On an interleave the counter # jumped past us, so local memory does NOT reflect the other # writer's content; leave _redis_versions stale-low so the next # ensure_fresh_from_redis reloads and self-heals (our appended # row still survives in the list). if new_ver == expected_prev + 1: self._redis_versions[room_id] = new_ver logger.debug( "Persisted append to Redis list: %s (limit: %d, ver: %d)", key, limit, new_ver, extra={"channel_id": room_id}, ) except Exception as exc: logger.warning( "Failed to persist conversation append to Redis for channel %s: %s", room_id, exc, exc_info=True, ) async def _load_from_redis(self, room_id: str) -> list[dict[str, Any]]: """Hydrate a room's history from its Redis list into the local cache. Replaces the in-memory history for *room_id* with the full contents of ``sg:conversation:{room_id}``, deserializing each JSON row, and adopts the live ``:ver`` counter into ``self._redis_versions`` so subsequent freshness checks compare against the version actually loaded. First drains any in-flight local persists (via :meth:`_drain_pending_persists`) so a cold load or reload never overwrites memory with a Redis list that is missing a turn whose sync-append persist is still in flight — important for, e.g., a just-posted image. The list read and version read are issued concurrently with ``asyncio.gather``. On any error it logs a warning and returns an empty list, leaving the cache untouched. Runs against Redis (``LRANGE`` + ``GET``). Called by :meth:`get_messages_async` (cold cache), :meth:`ensure_fresh_from_redis` (when behind), and :meth:`_persist_full_history` (after a CAS conflict, to rebase the edit). Args: room_id (str): The room/channel whose history is being loaded. Returns: list[dict[str, Any]]: The hydrated message list, or ``[]`` on failure. """ # Land any in-flight local persists first so a cold-load (or a reload) # never overwrites memory with a Redis list that is missing a turn whose # sync-append persist is still in flight. await self._drain_pending_persists(room_id) key = self._conversation_key(room_id) ver_key = self._conversation_version_key(room_id) try: raw_history, remote_ver_raw = await asyncio.gather( self._redis.lrange(key, 0, -1), self._redis.get(ver_key), ) remote_ver = int(remote_ver_raw or 0) history = [] for item in raw_history: if isinstance(item, bytes): item = item.decode("utf-8") history.append(json.loads(item)) with self._lock: # Reconcile against any local append that landed after the # LRANGE snapshot above (a concurrent append_async whose persist # is still in flight) so a reload never evicts a just-appended # turn — the last-mile closure of the image-loss race that the # tracked-persist drain leaves open during _append_locked. history = _merge_local_tail( history, self._histories.get(room_id) or [] ) self._histories[room_id] = history self._last_activity[room_id] = time.monotonic() self._redis_versions[room_id] = remote_ver logger.debug( "Hydrated conversation history from Redis: %s (%d messages, ver=%d)", key, len(history), remote_ver, extra={"channel_id": room_id}, ) return history except Exception as exc: logger.warning( "Failed to load conversation history from Redis for channel %s: %s", room_id, exc, exc_info=True, ) return [] #: Max attempts for :meth:`_persist_full_history` when the optimistic #: compare-and-set keeps losing to concurrent writers (each retry reloads + #: re-applies the edit on the fresh base). _MAX_PERSIST_FULL_RETRIES = 3 #: Compare-and-set full-history rewrite. ARGV[1] is the version this process #: believes Redis is at; the script rebuilds the list ONLY if the live #: counter still equals it, otherwise it returns ``-1`` and touches nothing #: (so a concurrent other-process append is never clobbered by the #: DEL+rebuild). ARGV[2] is the message count, ARGV[3..] the serialized rows. _PERSIST_FULL_SCRIPT = """ local key = KEYS[1] local ver_key = KEYS[2] local expected = tonumber(ARGV[1]) local current = tonumber(redis.call('GET', ver_key)) or 0 if current ~= expected then return -1 end redis.call('DEL', key) local n = tonumber(ARGV[2]) for i = 1, n do redis.call('RPUSH', key, ARGV[i + 2]) end return redis.call('INCR', ver_key) """ async def _persist_full_history( self, room_id: str, reapply: Callable[[], bool] | None = None, ) -> None: """Overwrite conversation history in Redis with compare-and-set. Rewrites the Redis list from this process's in-memory history, but only when the live version counter still equals the version this process believes Redis is at (``expected_prev``). If another writer advanced the counter between the snapshot and the script, the Lua CAS returns ``-1`` and changes nothing — so a genuinely-concurrent other-process append is never clobbered by the DEL+rebuild. On a conflict we reload from Redis (pulling in the other writer's turns, which overwrites in-memory history and so discards our just-made edit), re-apply the edit on the fresh base via *reapply*, and retry — bounded by :attr:`_MAX_PERSIST_FULL_RETRIES`. *reapply* is the ``_*_locked`` mutation as a zero-arg callable returning whether it re-applied; when ``None`` or it can no longer find the message, we stop without clobbering. """ key = self._conversation_key(room_id) ver_key = self._conversation_version_key(room_id) try: for attempt in range(1, self._MAX_PERSIST_FULL_RETRIES + 1): with self._lock: history = list(self._histories.get(room_id, [])) expected_prev = self._redis_versions.get(room_id, 0) serialized_list = [json.dumps(msg, default=str) for msg in history] args = [ str(expected_prev), str(len(serialized_list)), *serialized_list, ] result = int( await self._redis.eval( self._PERSIST_FULL_SCRIPT, 2, key, ver_key, *args, ) ) if result >= 0: new_ver = result with self._lock: # CAS succeeded => no writer interleaved => memory # reflects new_ver (which == expected_prev + 1 here; the # guard is kept for symmetry with _persist_append). if new_ver == expected_prev + 1: self._redis_versions[room_id] = new_ver logger.debug( "Persisted full history to Redis: %s (%d messages, ver=%d)", key, len(serialized_list), new_ver, extra={"channel_id": room_id}, ) return # Version conflict: another writer advanced the counter between # our snapshot and the script. Do NOT clobber — reload to pull in # their turns, re-apply our edit on the fresh base, and retry. if attempt >= self._MAX_PERSIST_FULL_RETRIES: break logger.info( "Full-history persist for %s hit a version conflict " "(expected ver=%d); reloading and retrying (%d/%d)", room_id, expected_prev, attempt, self._MAX_PERSIST_FULL_RETRIES, ) await self._load_from_redis(room_id) if reapply is not None: reapplied = await asyncio.to_thread(reapply) if not reapplied: logger.warning( "Full-history persist for %s: the edited message is " "no longer present after reload; not re-applying", room_id, ) return logger.warning( "Full-history persist for %s gave up after %d version " "conflicts; the in-memory edit may not be persisted", room_id, self._MAX_PERSIST_FULL_RETRIES, ) except Exception as exc: logger.warning( "Failed to persist full history to Redis for channel %s: %s", room_id, exc, exc_info=True, )