"""Per-room conversation history manager."""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from prompt_renderer import PromptRenderer
logger = logging.getLogger(__name__)
[docs]
class ConversationManager:
"""Maintains per-room conversation histories.
Each room has its own message list. A :class:`PromptRenderer` produces
the system prompt dynamically from a Jinja2 template so that it can
include room-specific and tool-specific context.
A sliding-window strategy keeps at most *max_history* user/assistant
messages per room to avoid unbounded memory growth.
"""
[docs]
def __init__(
self,
prompt_renderer: PromptRenderer,
max_history: int = 100,
) -> None:
"""Initialize the instance.
Args:
prompt_renderer (PromptRenderer): The prompt renderer value.
max_history (int): The max history value.
"""
self.prompt_renderer = prompt_renderer
self.max_history = max_history
# room_id -> list of {"role": ..., "content": ...}
# content may be a plain string or a list of multimodal
# content parts (images, audio, video, files).
self._histories: dict[str, list[dict[str, Any]]] = {}
# Per-channel overrides for max_history (set via tool).
self._channel_overrides: dict[str, int] = {}
# Channels that need re-backfilling after a limit increase.
self._rebackfill_requested: set[str] = set()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
[docs]
def append(
self,
room_id: str,
role: str,
content: str | list[dict[str, Any]],
) -> None:
"""Add a message to *room_id*'s history and trim if needed.
*content* may be a plain string or a list of OpenRouter
multimodal content parts.
"""
history = self._histories.setdefault(room_id, [])
history.append({"role": role, "content": content})
self._trim(room_id)
[docs]
def get_messages(
self,
room_id: str,
room_context: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Return the full message list for *room_id*, including the system prompt.
*room_context* is forwarded to the :class:`PromptRenderer` so the
Jinja2 template can reference room-level variables such as
``room_name``, ``room_id``, ``sender``, etc.
"""
messages: list[dict[str, Any]] = []
system_prompt = self.prompt_renderer.render(room_context)
if system_prompt.strip():
messages.append({"role": "system", "content": system_prompt})
messages.extend(self._histories.get(room_id, []))
return messages
[docs]
async def get_messages_async(
self,
room_id: str,
room_context: dict[str, Any] | None = None,
) -> list[dict[str, Any]]:
"""Return the full message list for *room_id*, including the system prompt.
Same as :meth:`get_messages` but runs Jinja2 rendering in a thread
pool so the event loop is not blocked by CPU-bound template work.
"""
messages: list[dict[str, Any]] = []
system_prompt = await asyncio.to_thread(
self.prompt_renderer.render, room_context,
)
if system_prompt.strip():
messages.append({"role": "system", "content": system_prompt})
messages.extend(self._histories.get(room_id, []))
return messages
[docs]
def update_message(
self,
room_id: str,
message_id: str,
new_content: str | list[dict[str, Any]],
) -> bool:
"""Replace the content of an existing history entry by message ID.
Scans backward through *room_id*'s history for an entry whose
text contains ``[Message ID: {message_id}]`` and replaces its
content. Returns ``True`` if a match was found.
"""
history = self._histories.get(room_id)
if not history:
return False
marker = f"[Message ID: {message_id}]"
for entry in reversed(history):
content = entry["content"]
if isinstance(content, str):
if marker in content:
entry["content"] = new_content
return True
elif isinstance(content, list):
for part in content:
if part.get("type") == "text" and marker in part.get("text", ""):
entry["content"] = new_content
return True
return False
[docs]
def mark_deleted(
self,
room_id: str,
message_id: str,
deleted_at_iso: str,
) -> bool:
"""Inject a ``[deleted at TIMESTAMP]`` tag into an existing entry.
The original content is preserved so the bot retains full context.
Returns ``True`` if the message was found.
"""
history = self._histories.get(room_id)
if not history:
return False
marker = f"[Message ID: {message_id}]"
tag = f" [deleted at {deleted_at_iso}]"
for entry in reversed(history):
content = entry["content"]
if isinstance(content, str):
if marker in content and tag not in content:
entry["content"] = content.replace(
marker, marker + tag, 1,
)
return True
elif isinstance(content, list):
for part in content:
text = part.get("text", "")
if (
part.get("type") == "text"
and marker in text
and tag not in text
):
part["text"] = text.replace(
marker, marker + tag, 1,
)
return True
return False
[docs]
def clear(self, room_id: str) -> None:
"""Wipe conversation history for a room."""
self._histories.pop(room_id, None)
# ------------------------------------------------------------------
# Per-channel limit overrides
# ------------------------------------------------------------------
#: Absolute bounds for per-channel overrides.
MIN_CHANNEL_LIMIT = 50
MAX_CHANNEL_LIMIT = 1000
[docs]
def set_channel_limit(self, room_id: str, limit: int) -> int:
"""Set a per-channel override for *max_history*.
The value is clamped to ``[MIN_CHANNEL_LIMIT, MAX_CHANNEL_LIMIT]``.
Returns the clamped value actually stored.
"""
clamped = max(self.MIN_CHANNEL_LIMIT, min(limit, self.MAX_CHANNEL_LIMIT))
current_effective = self.get_channel_limit(room_id)
self._channel_overrides[room_id] = clamped
# If the limit was raised, request a re-backfill so more
# messages are loaded from the cache on the next turn.
if clamped > current_effective:
self._rebackfill_requested.add(room_id)
# Re-trim with the new limit in case it shrank.
self._trim(room_id)
return clamped
[docs]
def get_channel_limit(self, room_id: str) -> int:
"""Return the effective message limit for *room_id*.
Uses the per-channel override if set, otherwise the global default.
"""
return self._channel_overrides.get(room_id, self.max_history)
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _trim(self, room_id: str) -> None:
"""Keep only the last N messages for a room.
Uses the per-channel override if set, otherwise falls back to
the global ``max_history``.
"""
history = self._histories.get(room_id)
limit = self._channel_overrides.get(room_id, self.max_history)
if history and len(history) > limit:
self._histories[room_id] = history[-limit:]