conversation

Per-room conversation history manager backed by Redis.

Defines ConversationManager, which keeps an in-memory sliding window of user/assistant turns per room_id and mirrors it to Redis lists (under sg:conversation:{room_id} with a companion :ver version counter) so that the separate inference, agents, and consolidation workers all converge on the same history. It renders the dynamic system prompt through a PromptRenderer, guards its maps with a threading.RLock, and uses an optimistic compare-and-set Lua script plus a drainable set of background persist tasks to keep concurrent writers from clobbering each other.

class conversation.ConversationManager(prompt_renderer, max_history=100, redis=None)[source]

Bases: object

Maintains per-room conversation histories.

Each room has its own message list. A PromptRenderer produces the system prompt dynamically from a Jinja2 template so that it can include room-specific and tool-specific context.

A sliding-window strategy keeps at most max_history user/assistant messages per room to avoid unbounded memory growth.

Concurrency: The per-channel MessageQueue normally serializes inference for a given (platform, channel_id), so one writer per room_id holds in the common case. A threading.RLock still protects _histories and related maps when multiple coroutines touch the same room (e.g. tools) or when different channels are updated concurrently.

Parameters:
__init__(prompt_renderer, max_history=100, redis=None)[source]

Initialize the instance.

Parameters:
  • prompt_renderer (PromptRenderer) – The prompt renderer value.

  • max_history (int) – The max history value.

  • redis (Any | None) – Redis client.

Return type:

None

append(room_id, role, content)[source]

Add a message to room_id’s history and trim if needed.

content may be a plain string or a list of OpenRouter multimodal content parts. The Redis persist is fired as a tracked background task (drainable before a reload); callers that need the persist to be durable before they proceed should use append_async(), which awaits it.

Return type:

None

Parameters:
last_history_role(room_id)[source]

Return the role of the last stored message for room_id.

History entries are user or assistant turns (plus optional system prompt only in get_messages()). Returns None when the room has no stored history.

Return type:

str | None

Parameters:

room_id (str)

get_messages(room_id, room_context=None)[source]

Return the full message list for room_id, including the system prompt.

room_context is forwarded to the PromptRenderer so the Jinja2 template can reference room-level variables such as room_name, room_id, sender, etc.

Return type:

list[dict[str, Any]]

Parameters:
async ensure_fresh_from_redis(room_id)[source]

Re-hydrate in-memory history when another worker has advanced Redis.

Cheaply detects cross-worker divergence by comparing the locally mirrored version against the remote counter, and only does the heavier full reload when this process is genuinely behind. Because the five microservices share one Redis-backed history, an append made by (say) the agents worker must become visible to the inference worker before it next renders the prompt; this is the synchronisation point that makes that happen.

First drains any in-flight fire-and-forget persists for this room (so our own un-persisted writes are reflected in self._redis_versions and a reload cannot drop a just-appended turn), then reads sg:conversation:{room_id}:ver via GET and, if the remote version exceeds the local one, calls _load_from_redis() to overwrite the in-memory cache. A failed version GET is logged at debug and treated as “no refresh needed”. No-ops when Redis is not configured.

Called by get_messages_async() (the cache-warm path) and by the message processor before assembling context (message_processor/processor.py around line 2000).

Parameters:

room_id (str) – The room/channel whose history may need re-hydrating.

Return type:

None

async get_messages_async(room_id, room_context=None)[source]

Return the full message list for room_id, including the system prompt.

Same as get_messages() but runs Jinja2 rendering and the lock-held deepcopy in a thread pool so the event loop is not blocked by CPU-bound work.

Return type:

list[dict[str, Any]]

Parameters:
update_message(room_id, message_id, new_content)[source]

Replace the content of an existing history entry by message ID.

Scans backward through room_id’s history for an entry whose text contains [Message ID: {message_id}] and replaces its content. Returns True if a match was found.

Return type:

bool

Parameters:
mark_deleted(room_id, message_id, deleted_at_iso)[source]

Inject a [deleted at TIMESTAMP] tag into an existing entry.

The original content is preserved so the bot retains full context. Returns True if the message was found.

Return type:

bool

Parameters:
  • room_id (str)

  • message_id (str)

  • deleted_at_iso (str)

patch_reactions(room_id, message_id, reactions_str)[source]

Update the [Reactions: ...] tag on a history entry.

Strips any existing reaction tag and, if reactions_str is non-empty, inserts a new one after the message-ID marker. Returns True if the entry was found.

Return type:

bool

Parameters:
  • room_id (str)

  • message_id (str)

  • reactions_str (str)

async append_async(room_id, role, content)[source]

Async version of append().

Runs the in-memory mutation in a thread, then awaits the Redis persist on the event loop. Awaiting is essential: the previous asyncio.to_thread(self.append, ...) ran append in a worker thread where asyncio.get_running_loop() raises, so the persist was silently skipped and the turn (e.g. a user’s image) never reached Redis — a concurrent reload would then drop it from in-memory history.

The persist is registered as a tracked pending task (exactly like the sync append()) before being awaited, so a concurrent reader’s _drain_pending_persists() waits for this RPUSH to commit before it reloads. Awaiting an untracked persist defeated that guard: a reload landing during the persist window would read a Redis list still missing this just-appended turn and overwrite it out of memory. The turn survived in Redis, but a later full-history rewrite (an edit or a reaction) then persisted the turn-less snapshot and lost it for good — and since _preprocess_and_record records the user’s image turn through this path, it was the one silently losing images.

Return type:

None

Parameters:
async update_message_async(room_id, message_id, new_content)[source]

Async version of update_message().

Mutates in a thread, then awaits the full-history persist on the loop (the sync path’s in-thread persist scheduling would otherwise be silently skipped — there is no running loop in the worker thread).

Return type:

bool

Parameters:
async mark_deleted_async(room_id, message_id, deleted_at_iso)[source]

Async version of mark_deleted() — mutate in a thread, then await the full-history persist on the loop.

Return type:

bool

Parameters:
  • room_id (str)

  • message_id (str)

  • deleted_at_iso (str)

async patch_reactions_async(room_id, message_id, reactions_str)[source]

Async version of patch_reactions() — mutate in a thread, then await the full-history persist on the loop.

Return type:

bool

Parameters:
  • room_id (str)

  • message_id (str)

  • reactions_str (str)

reap_stale(max_idle_seconds=3600.0)[source]

Remove histories for channels idle longer than max_idle_seconds.

Returns the number of channels reaped.

Return type:

int

Parameters:

max_idle_seconds (float)

clear(room_id)[source]

Wipe a room’s conversation history from memory and Redis.

Drops the in-memory history for room_id under the lock, then (if Redis is configured) fires a fire-and-forget DEL of the backing sg:conversation:{room_id} list via loop.create_task. When there is no running event loop the Redis delete is skipped and a warning is logged, leaving only the in-memory wipe. Note this does not reset the :ver version counter, so a later writer’s compare-and-set still sees a monotonic version.

Called by the message processor’s reset/clear paths (message_processor/processor.py around lines 869, 903, and 2034).

Parameters:

room_id (str) – The room/channel whose history is being cleared.

Return type:

None

is_rebackfill_requested(room_id)[source]

Report whether room_id is flagged to re-backfill more history.

Returns the pending state of the re-backfill marker that set_channel_limit() raises when a channel’s history limit is increased, signalling that the next turn should pull additional older messages from the platform cache to fill the now-larger window. A simple lock-held membership test with no side effects.

Called by the message processor when deciding whether to backfill (message_processor/processor.py around line 2029).

Parameters:

room_id (str) – The room/channel to check.

Returns:

True if a re-backfill is pending for the room.

Return type:

bool

discard_rebackfill_request(room_id)[source]

Clear the pending re-backfill marker for room_id.

Removes the room from the re-backfill set under the lock once the backfill it requested has been carried out, so the work is not repeated on the next turn. Idempotent — discarding an absent room is a no-op.

Called after a backfill completes by the message processor (message_processor/processor.py around line 2033) and the history backfill helper (message_processor/history_backfill.py around line 104).

Parameters:

room_id (str) – The room/channel whose marker is being cleared.

Return type:

None

MIN_CHANNEL_LIMIT = 50

Absolute bounds for per-channel overrides.

MAX_CHANNEL_LIMIT = 1000
set_channel_limit(room_id, limit)[source]

Set a per-channel override for max_history.

The value is clamped to [MIN_CHANNEL_LIMIT, MAX_CHANNEL_LIMIT]. Returns the clamped value actually stored.

Return type:

int

Parameters:
get_channel_limit(room_id)[source]

Return the effective message limit for room_id.

Uses the per-channel override if set, otherwise the global default.

Return type:

int

Parameters:

room_id (str)

get_history_message_count(room_id)[source]

Return how many user/assistant turns are currently stored for room_id.

Reports the length of the in-memory sliding window (excluding the system prompt, which is rendered fresh and never stored), so the value reflects what is presently cached rather than the full Redis-persisted history. A lock-held read with no side effects; returns 0 for an unknown or empty room.

Called by prompt-context assembly to size/decide backfill (prompt_context.py around line 1985).

Parameters:

room_id (str) – The room/channel whose stored turn count is wanted.

Returns:

The number of cached user/assistant messages for the room.

Return type:

int