Source code for conversation

"""Per-room conversation history manager."""

from __future__ import annotations

import asyncio
import logging
from typing import Any

from prompt_renderer import PromptRenderer

logger = logging.getLogger(__name__)


[docs] class ConversationManager: """Maintains per-room conversation histories. Each room has its own message list. A :class:`PromptRenderer` produces the system prompt dynamically from a Jinja2 template so that it can include room-specific and tool-specific context. A sliding-window strategy keeps at most *max_history* user/assistant messages per room to avoid unbounded memory growth. """
[docs] def __init__( self, prompt_renderer: PromptRenderer, max_history: int = 100, ) -> None: """Initialize the instance. Args: prompt_renderer (PromptRenderer): The prompt renderer value. max_history (int): The max history value. """ self.prompt_renderer = prompt_renderer self.max_history = max_history # room_id -> list of {"role": ..., "content": ...} # content may be a plain string or a list of multimodal # content parts (images, audio, video, files). self._histories: dict[str, list[dict[str, Any]]] = {} # Per-channel overrides for max_history (set via tool). self._channel_overrides: dict[str, int] = {} # Channels that need re-backfilling after a limit increase. self._rebackfill_requested: set[str] = set()
# ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def append( self, room_id: str, role: str, content: str | list[dict[str, Any]], ) -> None: """Add a message to *room_id*'s history and trim if needed. *content* may be a plain string or a list of OpenRouter multimodal content parts. """ history = self._histories.setdefault(room_id, []) history.append({"role": role, "content": content}) self._trim(room_id)
[docs] def get_messages( self, room_id: str, room_context: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Return the full message list for *room_id*, including the system prompt. *room_context* is forwarded to the :class:`PromptRenderer` so the Jinja2 template can reference room-level variables such as ``room_name``, ``room_id``, ``sender``, etc. """ messages: list[dict[str, Any]] = [] system_prompt = self.prompt_renderer.render(room_context) if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt}) messages.extend(self._histories.get(room_id, [])) return messages
[docs] async def get_messages_async( self, room_id: str, room_context: dict[str, Any] | None = None, ) -> list[dict[str, Any]]: """Return the full message list for *room_id*, including the system prompt. Same as :meth:`get_messages` but runs Jinja2 rendering in a thread pool so the event loop is not blocked by CPU-bound template work. """ messages: list[dict[str, Any]] = [] system_prompt = await asyncio.to_thread( self.prompt_renderer.render, room_context, ) if system_prompt.strip(): messages.append({"role": "system", "content": system_prompt}) messages.extend(self._histories.get(room_id, [])) return messages
[docs] def update_message( self, room_id: str, message_id: str, new_content: str | list[dict[str, Any]], ) -> bool: """Replace the content of an existing history entry by message ID. Scans backward through *room_id*'s history for an entry whose text contains ``[Message ID: {message_id}]`` and replaces its content. Returns ``True`` if a match was found. """ history = self._histories.get(room_id) if not history: return False marker = f"[Message ID: {message_id}]" for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content: entry["content"] = new_content return True elif isinstance(content, list): for part in content: if part.get("type") == "text" and marker in part.get("text", ""): entry["content"] = new_content return True return False
[docs] def mark_deleted( self, room_id: str, message_id: str, deleted_at_iso: str, ) -> bool: """Inject a ``[deleted at TIMESTAMP]`` tag into an existing entry. The original content is preserved so the bot retains full context. Returns ``True`` if the message was found. """ history = self._histories.get(room_id) if not history: return False marker = f"[Message ID: {message_id}]" tag = f" [deleted at {deleted_at_iso}]" for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content and tag not in content: entry["content"] = content.replace( marker, marker + tag, 1, ) return True elif isinstance(content, list): for part in content: text = part.get("text", "") if ( part.get("type") == "text" and marker in text and tag not in text ): part["text"] = text.replace( marker, marker + tag, 1, ) return True return False
[docs] def clear(self, room_id: str) -> None: """Wipe conversation history for a room.""" self._histories.pop(room_id, None)
# ------------------------------------------------------------------ # Per-channel limit overrides # ------------------------------------------------------------------ #: Absolute bounds for per-channel overrides. MIN_CHANNEL_LIMIT = 50 MAX_CHANNEL_LIMIT = 1000
[docs] def set_channel_limit(self, room_id: str, limit: int) -> int: """Set a per-channel override for *max_history*. The value is clamped to ``[MIN_CHANNEL_LIMIT, MAX_CHANNEL_LIMIT]``. Returns the clamped value actually stored. """ clamped = max(self.MIN_CHANNEL_LIMIT, min(limit, self.MAX_CHANNEL_LIMIT)) current_effective = self.get_channel_limit(room_id) self._channel_overrides[room_id] = clamped # If the limit was raised, request a re-backfill so more # messages are loaded from the cache on the next turn. if clamped > current_effective: self._rebackfill_requested.add(room_id) # Re-trim with the new limit in case it shrank. self._trim(room_id) return clamped
[docs] def get_channel_limit(self, room_id: str) -> int: """Return the effective message limit for *room_id*. Uses the per-channel override if set, otherwise the global default. """ return self._channel_overrides.get(room_id, self.max_history)
# ------------------------------------------------------------------ # Internal # ------------------------------------------------------------------ def _trim(self, room_id: str) -> None: """Keep only the last N messages for a room. Uses the per-channel override if set, otherwise falls back to the global ``max_history``. """ history = self._histories.get(room_id) limit = self._channel_overrides.get(room_id, self.max_history) if history and len(history) > limit: self._histories[room_id] = history[-limit:]