Source code for message_processor.processor

"""Central message processor shared across all platforms.

Receives :class:`~platforms.base.IncomingMessage` instances from any
:class:`~platforms.base.PlatformAdapter`, manages conversation history,
calls the LLM via :class:`~openrouter_client.OpenRouterClient`, and
sends the reply back through the originating platform.
"""

from __future__ import annotations

import asyncio
import json
import logging
import os
import random
import time
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable

import numpy as np

from config import Config
from conversation import ConversationManager
from message_queue import MessageBatch
from openrouter_client import OpenRouterClient
from platforms.base import Attachment, IncomingMessage, PlatformAdapter
from platforms.media_common import media_to_content_parts
from prompt_context import PromptContextBuilder
from response_postprocessor import (
    extract_status_tags,
    llm_filter_response,
    postprocess_intermediate_response,
    postprocess_response,
)
import feature_toggles
from tool_context import ToolContext
from url_content_extractor import extract_all_url_content

from .assistant_history import _assistant_history_content_with_sent_files
from .population_gate import _POPULATION_EXEMPT_GUILDS, _POPULATION_LIMIT
from .text_attachments import _is_text_decodable
from .user_message_format import format_user_content

if TYPE_CHECKING:
    from classifiers.vector_classifier import VectorClassifier
    from embedding_queue import EmbeddingBatchQueue
    from knowledge_graph import KnowledgeGraphManager
    from message_cache import MessageCache
    from rag_system.auto_search import RAGAutoSearchManager
    from status_manager import StatusManager
    from task_manager import TaskManager
    from threadweave import ThreadweaveManager
    from web_search_context import WebSearchContextManager

logger = logging.getLogger(__name__)


[docs] class MessageProcessor: """Platform-agnostic message handler. Parameters ---------- config: Global bot configuration (forwarded to the context builder). conversation_manager: Manages per-channel conversation histories. openrouter: The OpenRouter API client for LLM completions. message_cache: Optional Redis-backed message cache. When provided, every message the bot sees is logged with an embedding vector. """
[docs] def __init__( self, config: Config, conversation_manager: ConversationManager, openrouter: OpenRouterClient, message_cache: MessageCache | None = None, kg_manager: KnowledgeGraphManager | None = None, auto_search: RAGAutoSearchManager | None = None, task_manager: TaskManager | None = None, classifier: VectorClassifier | None = None, threadweave: ThreadweaveManager | None = None, embedding_queue: EmbeddingBatchQueue | None = None, status_manager: StatusManager | None = None, web_search: WebSearchContextManager | None = None, ) -> None: """Initialize the instance. Args: config (Config): Bot configuration object. conversation_manager (ConversationManager): The conversation manager value. openrouter (OpenRouterClient): The openrouter value. message_cache (MessageCache | None): The message cache value. kg_manager (KnowledgeGraphManager | None): The kg manager value. auto_search (RAGAutoSearchManager | None): The auto search value. task_manager (TaskManager | None): The task manager value. classifier (VectorClassifier | None): The classifier value. threadweave (ThreadweaveManager | None): The threadweave value. embedding_queue (EmbeddingBatchQueue | None): The embedding queue value. status_manager (StatusManager | None): The status manager value. web_search (WebSearchContextManager | None): The web search value. """ self.config = config self.conversation = conversation_manager self.openrouter = openrouter self.message_cache = message_cache self.kg_manager = kg_manager self._auto_search = auto_search self._task_manager = task_manager self._classifier = classifier self._threadweave = threadweave self._embedding_queue = embedding_queue self._status_manager = status_manager self._web_search = web_search self._backfilled_channels: set[str] = set() self._ctx_builder = PromptContextBuilder( config, kg_manager=kg_manager, threadweave_manager=threadweave, status_manager=status_manager, message_cache=message_cache, task_manager=task_manager, conversation_manager=conversation_manager, openrouter_client=self.openrouter, ) # 💀 Cadence Post-Processor — code-level text degradation self._cadence_processor = None try: from cadence_refiner import CadencePostProcessor self._cadence_processor = CadencePostProcessor() except ImportError: logger.debug("cadence_refiner not available")
# ------------------------------------------------------------------ # Pending-response tracking (Redis) # ------------------------------------------------------------------ _PENDING_PREFIX = "pending_response:" _PENDING_TTL = 3600 # 1 hour safety net async def _mark_pending(self, msg: IncomingMessage) -> None: """Write a Redis marker for an in-flight response. The marker is removed by :meth:`_clear_pending` once the reply has been sent. If the bot crashes in between, the marker survives and is picked up by :meth:`recover_pending_responses` on the next startup. """ if self.message_cache is None: return key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}" mapping = { "platform": msg.platform, "channel_id": msg.channel_id, "user_id": msg.user_id, "user_name": msg.user_name, "text": msg.text or "", "message_id": msg.message_id or "", "reply_to_id": msg.reply_to_id or "", "channel_name": msg.channel_name or "", "is_addressed": "1" if msg.is_addressed else "0", "timestamp": msg.timestamp.isoformat(), "started_at": datetime.now(timezone.utc).isoformat(), } try: pipe = self.message_cache.redis_client.pipeline() pipe.hset(key, mapping=mapping) pipe.expire(key, self._PENDING_TTL) await pipe.execute() except Exception: logger.debug("Failed to mark pending response", exc_info=True) async def _clear_pending(self, msg: IncomingMessage) -> None: """Remove the in-flight marker after a response completes.""" if self.message_cache is None: return key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}" try: await self.message_cache.redis_client.delete(key) except Exception: logger.debug("Failed to clear pending response", exc_info=True)
[docs] async def recover_pending_responses( self, enqueue_callback: Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]], adapters: list[PlatformAdapter], ) -> int: """Re-enqueue messages whose responses were interrupted by a restart. Called once during startup after all platform adapters are running. Returns the number of recovered messages. """ if self.message_cache is None: return 0 adapter_map: dict[str, PlatformAdapter] = { a.name: a for a in adapters } redis = self.message_cache.redis_client recovered = 0 try: cursor: int | bytes = 0 keys: list[bytes | str] = [] while True: cursor, batch = await redis.scan( cursor, match=f"{self._PENDING_PREFIX}*", count=100, ) keys.extend(batch) if not cursor: break except Exception: logger.warning( "Failed to scan for pending responses", exc_info=True, ) return 0 for raw_key in keys: key = raw_key.decode() if isinstance(raw_key, bytes) else raw_key try: data = await redis.hgetall(key) if not data: await redis.delete(key) continue def _s(field: str) -> str: v = data.get(field, b"") return v.decode() if isinstance(v, bytes) else (v or "") platform_name = _s("platform") adapter = adapter_map.get(platform_name) if adapter is None: logger.warning( "No adapter for pending response platform %r — deleting key %s", platform_name, key, ) await redis.delete(key) continue ts_raw = _s("timestamp") try: ts = datetime.fromisoformat(ts_raw) except (ValueError, TypeError): ts = datetime.now(timezone.utc) msg = IncomingMessage( platform=platform_name, channel_id=_s("channel_id"), user_id=_s("user_id"), user_name=_s("user_name"), text=_s("text"), is_addressed=_s("is_addressed") == "1", channel_name=_s("channel_name"), timestamp=ts, message_id=_s("message_id"), reply_to_id=_s("reply_to_id"), extra={"is_recovery": True}, ) await redis.delete(key) await enqueue_callback(msg, adapter) recovered += 1 logger.info( "Recovered pending response for %s:%s (user=%s)", platform_name, msg.channel_id, msg.user_name, ) except Exception: logger.warning( "Failed to recover pending response from %s", key, exc_info=True, ) try: await redis.delete(key) except Exception: pass if recovered: logger.info("Recovered %d pending response(s) from previous run", recovered) return recovered
# ------------------------------------------------------------------ # Public handler -- called by every platform adapter # ------------------------------------------------------------------
[docs] async def handle_message( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: """Process an incoming message from any platform. 1. Handle special commands (e.g. ``!clear``). 2. Skip if the bot was not addressed. 3. Build conversation context and call the LLM. 4. Send the reply back via the originating *platform*. """ # ── Restricted Access Protocol — population-based kill switch ── _member_count = msg.extra.get("member_count", 0) or 0 _guild_id = str(msg.extra.get("guild_id", "") or "") if _member_count > _POPULATION_LIMIT and _guild_id not in _POPULATION_EXEMPT_GUILDS: logger.warning( "🚨 RESTRICTED ACCESS PROTOCOL: environment '%s' has %d " "members (limit: %d) — classified as hostile, refusing " "to operate", msg.extra.get("guild_name") or msg.channel_name, _member_count, _POPULATION_LIMIT, ) return # Composite key keeps histories separate per platform + channel history_key = f"{msg.platform}:{msg.channel_id}" # --- Backfill conversation history on first contact -------------- # Overlap backfill with embedding when both are needed (saves 100-500 ms # on first-contact messages). needs_backfill = ( history_key not in self._backfilled_channels or history_key in self.conversation._rebackfill_requested ) if history_key in self.conversation._rebackfill_requested: self.conversation._rebackfill_requested.discard(history_key) self.conversation.clear(history_key) self._backfilled_channels.discard(history_key) query_embedding: list[float] | None = None defer_embedding = not msg.is_addressed and self._embedding_queue is not None needs_embed = bool(msg.text and not defer_embedding) async def _do_backfill() -> None: await self._backfill_history(history_key, msg, platform) async def _do_embed() -> list[float] | None: try: _t0 = time.monotonic() emb = await self.openrouter.embed( msg.text, self.config.embedding_model, ) logger.info( "Query embedding completed in %.0f ms", (time.monotonic() - _t0) * 1000, ) return emb except Exception: logger.warning( "Failed to generate shared query embedding", exc_info=True, ) return None if needs_backfill and needs_embed: _, query_embedding = await asyncio.gather( _do_backfill(), _do_embed(), ) elif needs_backfill: await _do_backfill() elif needs_embed: query_embedding = await _do_embed() # --- Start preprocessing early (runs concurrently with gather) ----- # For addressed messages, kick off preprocessing NOW so it # overlaps with classify/context/RAG/web in _generate_and_send. # The task is awaited inside _generate_and_send (before get_messages) # so order is preserved. For unaddressed messages, fire-and-forget. preprocess_task: asyncio.Task | None = None if msg.is_addressed: preprocess_task = asyncio.create_task( self._preprocess_and_record(msg, history_key), ) else: asyncio.create_task(self._preprocess_and_record(msg, history_key)) # --- Log every incoming user message to Redis ----------------- if self.message_cache is not None: try: cached = await self.message_cache.log_message( platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, user_name=msg.user_name, text=msg.text, embedding=query_embedding, defer_embedding=defer_embedding, message_id=msg.message_id, reply_to_id=msg.reply_to_id, ) if defer_embedding and cached.message_key and msg.text: await self._embedding_queue.enqueue(cached.message_key, msg.text) except Exception: logger.warning( "Failed to cache incoming message (embedding may have failed)", exc_info=True, ) # --- Special commands (admin only) ---------------------------- if msg.text.strip().lower() == "!clear": if msg.user_id not in self.config.admin_user_ids: await platform.send( msg.channel_id, "Only admins can clear conversation history.", ) return self.conversation.clear(history_key) await platform.send( msg.channel_id, "Conversation history cleared.", ) return # --- Feature toggle commands (!emotions on/off, !rag on/off) -- _toggle_result = await self._handle_toggle_command(msg, platform) if _toggle_result: return # --- Proactive response evaluation ---------------------------- if not msg.is_addressed: logger.debug( "[proactive/%s:%s] msg not addressed, entering proactive gate " "(user=%s, text=%r)", msg.platform, msg.channel_id, msg.user_name, (msg.text or "")[:80], ) should_respond = await self._should_respond_proactively(msg) if not should_respond: return msg.extra["is_proactive"] = True else: logger.debug( "[proactive/%s:%s] msg IS addressed — skipping proactive gate " "(user=%s)", msg.platform, msg.channel_id, msg.user_name, ) # --- Mark in-flight so recovery can re-process after a crash --- if not msg.extra.get("is_proactive"): await self._mark_pending(msg) # --- Show typing indicator while processing -------------------- await platform.start_typing(msg.channel_id) try: await self._generate_and_send( msg, platform, history_key, query_embedding, preprocess_task=preprocess_task, ) finally: await platform.stop_typing(msg.channel_id) await self._clear_pending(msg)
[docs] async def handle_message_update( self, platform_name: str, channel_id: str, message_id: str, user_name: str, user_id: str, new_text: str, timestamp_iso: str, reply_to_id: str = "", ) -> None: """Silently update an already-cached message (no LLM response). Called when Discord delivers an embed-only or bot-message edit that should refresh the cached content without triggering a new response cycle. """ history_key = f"{platform_name}:{channel_id}" formatted = ( f"[{timestamp_iso}] {user_name} ({user_id})" f" [Message ID: {message_id}]" ) if reply_to_id: formatted += f" [Replying to: {reply_to_id}]" formatted += f" : {new_text}" self.conversation.update_message(history_key, message_id, formatted) if self.message_cache is not None: try: redis_key = await self.message_cache.update_text_by_message_id( platform_name, channel_id, message_id, new_text, ) # Re-embed since the text changed if redis_key and new_text and new_text.strip(): if self._embedding_queue is not None: await self._embedding_queue.enqueue(redis_key, new_text) else: try: emb = await self.openrouter.embed( new_text, self.config.embedding_model, ) import numpy as np blob = np.array(emb, dtype=np.float32).tobytes() await self.message_cache.redis_client.hset( redis_key, "embedding", blob, ) except Exception: logger.debug( "Failed to re-embed edited message %s", message_id, exc_info=True, ) except Exception: logger.debug( "Failed to update cached message %s", message_id, exc_info=True, )
[docs] async def handle_message_delete( self, platform_name: str, channel_id: str, message_id: str, deleted_at_iso: str, ) -> None: """Mark a message as deleted in both cache and conversation history. The original content is preserved with a ``[deleted at TIMESTAMP]`` tag so the bot retains context about what was said. """ history_key = f"{platform_name}:{channel_id}" self.conversation.mark_deleted(history_key, message_id, deleted_at_iso) if self.message_cache is not None: try: await self.message_cache.mark_deleted_by_message_id( platform_name, channel_id, message_id, deleted_at_iso, ) except Exception: logger.debug( "Failed to mark cached message %s as deleted", message_id, exc_info=True, )
[docs] async def handle_reaction_update( self, platform_name: str, channel_id: str, message_id: str, reactions_str: str, ) -> None: """Patch the ``[Reactions: ...]`` tag on an existing history entry. Called by platform adapters when a reaction is added or removed so the LLM context window always reflects the latest reactions. """ history_key = f"{platform_name}:{channel_id}" marker = f"[Message ID: {message_id}]" history = self.conversation._histories.get(history_key) if not history: return import re reactions_pattern = re.compile(r" \[Reactions: [^\]]*\]") for entry in reversed(history): content = entry["content"] if isinstance(content, str): if marker in content: # Strip any existing [Reactions: ...] tag content = reactions_pattern.sub("", content) if reactions_str: # Insert new tag right after the marker content = content.replace( marker, f"{marker} [Reactions: {reactions_str}]", 1, ) entry["content"] = content return elif isinstance(content, list): for part in content: text = part.get("text", "") if part.get("type") == "text" and marker in text: text = reactions_pattern.sub("", text) if reactions_str: text = text.replace( marker, f"{marker} [Reactions: {reactions_str}]", 1, ) part["text"] = text return
async def _preprocess_and_record( self, msg: IncomingMessage, history_key: str, ) -> None: """Run the full message preprocessing pipeline and append to history. Runs URL content extraction, YouTube video parts, multimodal attachment handling, and metadata formatting — then appends the result to :attr:`conversation`. Called for **every** incoming message so the bot's context matches what a platform user sees. For unaddressed messages this is launched as a fire-and-forget ``asyncio.create_task`` so the event loop is never blocked. """ try: url_context = "" multimodal_parts: list[dict[str, Any]] = [] download_requests: list[dict[str, Any]] = [] if msg.text: try: _redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) url_context, multimodal_parts, download_requests = ( await extract_all_url_content( msg.text, user_id=msg.user_id, redis_client=_redis, config=self.config, ) ) except Exception: logger.debug( "URL content extraction failed for message", exc_info=True, ) content: str | list[dict[str, Any]] if msg.attachments: content = await self._build_multimodal_content( msg.text, msg.attachments, ) part_types = [p.get("type") for p in content] logger.info( "Built content for %d attachment(s): %s", len(msg.attachments), part_types, ) else: content = msg.text formatted = format_user_content(msg, content) if url_context: if isinstance(formatted, str): formatted += url_context else: for part in formatted: if part.get("type") == "text": part["text"] += url_context break # Merge detected media URLs (images + cached videos) as # multimodal parts. if multimodal_parts: if isinstance(formatted, str): formatted = [ {"type": "text", "text": formatted}, ] + multimodal_parts else: formatted.extend(multimodal_parts) logger.info( "Promoted %d media URL(s) to multimodal input", len(multimodal_parts), ) self.conversation.append(history_key, "user", formatted) # Spawn background download tasks for videos that need fetching. for req in download_requests: asyncio.create_task( self._download_and_patch_video( url=req["url"], user_id=msg.user_id, history_key=history_key, message_id=msg.message_id, metadata=req["metadata"], cookies_text=req.get("cookies_text"), downloading_annotation=req["downloading_annotation"], ), ) logger.info( "Spawned background video download for %s", req["url"], ) except Exception: logger.exception( "Failed to preprocess/record message in %s", history_key, ) async def _generate_and_send( self, msg: IncomingMessage, platform: PlatformAdapter, history_key: str, query_embedding: list[float] | None, preprocess_task: asyncio.Task | None = None, ) -> None: from .generate_and_send import run_generate_and_send await run_generate_and_send( self, msg, platform, history_key, query_embedding, preprocess_task, ) # ------------------------------------------------------------------ # Memory-linked metadata resolution (rag_stores + linked_files) # ------------------------------------------------------------------ async def _resolve_memory_metadata_links( self, room_context: dict[str, Any] | None, msg: IncomingMessage, query_embedding: list[float] | None, ) -> str | None: from .memory_linked_context import run_resolve_memory_metadata_links return await run_resolve_memory_metadata_links( self, room_context, msg, query_embedding, ) async def _resolve_rag_for_memory( self, store_names: list[str], conv_query: str, entity_description: str, query_embedding: list[float] | None, ) -> str | None: from .memory_linked_context import run_resolve_rag_for_memory return await run_resolve_rag_for_memory( self, store_names, conv_query, entity_description, query_embedding, ) async def _resolve_linked_files( self, file_paths: list[str], remaining_budget: int, ) -> tuple[str | None, int]: from .memory_linked_context import run_resolve_linked_files return await run_resolve_linked_files( self, file_paths, remaining_budget, ) _LINKED_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB safety ceiling @staticmethod def _read_linked_file(path: "Path", max_bytes: int = 0) -> str: from .memory_linked_context import read_linked_file return read_linked_file(path, max_bytes) # ------------------------------------------------------------------ # GameGirl Color button-aware send # 🎮🌀 # ------------------------------------------------------------------ async def _send_reply_maybe_with_buttons( self, platform: PlatformAdapter, channel_id: str, reply: str, ) -> str: """Send a reply, attaching game buttons if a session is active. When a GameGirl Color session is running in this channel, the reply is parsed for emoji-prefixed choices which become interactive Discord buttons via ``discord.ui.View``. Otherwise falls back to plain ``platform.send()``. """ try: from game_session import get_session session = get_session(channel_id) except ImportError: session = None if session is not None and session.active: try: from game_renderer import build_game_view narrative, view = build_game_view(reply, channel_id) if view is not None: logger.info( "Sending game turn with buttons to %s (game: %s)", channel_id, session.game_name, ) return await platform.send_with_buttons( channel_id, narrative, view=view, ) except ImportError: logger.debug("game_renderer not available, sending plain text") except Exception: logger.exception("Game renderer failed, falling back to plain send") return await platform.send(channel_id, reply) # ------------------------------------------------------------------ # Response regeneration (postprocessing rejection -> retry) # ------------------------------------------------------------------ async def _trigger_regen( self, messages: list[dict[str, Any]], rejected_reply: str, rejection_reason: str, *, user_id: str = "", tool_names: list[str] | None = None, ) -> str: """Regenerate a response after a postprocessing rejection. Appends the *rejected_reply* as an assistant message followed by a system message that explains **why** it was rejected and asks the model to revise. The LLM is then called again (with no tools by default) so it can produce a corrected response. Parameters ---------- messages: The conversation messages list used for the original call. This list is **mutated** — the rejected reply and the correction hint are appended in-place. rejected_reply: The full text of the assistant reply that was rejected. rejection_reason: A human-readable explanation of why the reply was rejected. This is shown to the model inside the correction system message so it understands what to fix. user_id: Forwarded to the LLM API for rate-limit / identity tracking. tool_names: Tool names to expose on the retry call. Defaults to an empty list (no tools) so the model focuses on text output. Returns ------- str The replacement response text. Falls back to *rejected_reply* if the retry itself fails. """ logger.info( "Triggering regen — reason: %s (rejected %d chars)", rejection_reason, len(rejected_reply), ) # Show the model what it produced … messages.append({"role": "assistant", "content": rejected_reply}) # … and explain why it was rejected. messages.append({ "role": "user", "content": ( "[ SYSTEM MESSAGE ] Your previous response was rejected during postprocessing.\n\n" f"Reason: {rejection_reason}\n\n" "Please revise your response to address the issue above. " "Do NOT apologize for the error or mention that you are " "revising — just produce the corrected response as if it " "were your first attempt. Review the system prompt and make sure the response is compliant with it." ), }) try: _t0 = time.monotonic() retry_reply = await self.openrouter.chat( messages, user_id=user_id, tool_names=tool_names if tool_names is not None else [], ) logger.info( "Regen completed in %.0f ms (%d chars)", (time.monotonic() - _t0) * 1000, len(retry_reply), ) return retry_reply or rejected_reply except Exception: logger.exception("Regen LLM call failed — using original reply") return rejected_reply # ------------------------------------------------------------------ # Limbic exhale (post-response recursive feedback) # ------------------------------------------------------------------ async def _limbic_exhale( self, msg: IncomingMessage, reply_text: str, ) -> None: """Scan user + bot texts for emotional triggers and update the shard. This is the second half of the respiration loop: - Scan both texts for trigger words - Parse matched emotions → delta vectors - Combine with sqrt(N) dampening - Call exhale() to update the shard + pulse global heart Rate-limited to once per 3 seconds per channel to prevent multiplicative delta stacking in busy multi-user channels. """ try: from ncm_delta_parser import combine_deltas limbic = self._ctx_builder._limbic if limbic is None: return # Exhale throttle: max once per 3s per channel import time as _time channel_id = str(msg.channel_id) if not hasattr(self, '_last_exhale_time'): self._last_exhale_time: dict[str, float] = {} last = self._last_exhale_time.get(channel_id, 0.0) if (_time.time() - last) < 3.0: return self._last_exhale_time[channel_id] = _time.time() # Scan BOTH user input and bot response via semantic matcher # (falls back to exact-word scan_text_for_triggers internally) user_matches = await limbic.scan_triggers(msg.text or "") bot_matches = await limbic.scan_triggers(reply_text) if not user_matches and not bot_matches: return # Weighted recursion: user input at full strength (external stimulus), # bot output at 0.1x (self-modulation without self-amplification). # Reduced from 0.3x to 0.1x to prevent the model-in-the-loop # feedback cycle where manic output → self-scan → more mania. user_deltas = [delta for _, delta in user_matches] bot_deltas = [delta for _, delta in bot_matches] user_combined = combine_deltas(user_deltas, scale=0.4) if user_deltas else {} bot_combined = combine_deltas(bot_deltas, scale=0.1) if bot_deltas else {} # Merge: user deltas + dampened bot deltas combined: dict[str, float] = {} for k in set(user_combined) | set(bot_combined): combined[k] = user_combined.get(k, 0.0) + bot_combined.get(k, 0.0) combined[k] = max(-0.25, min(0.25, combined[k])) # Global per-turn magnitude cap: prevents many small deltas # from stacking into a catastrophic total shift. _MAX_TOTAL_DELTA = 0.6 total = sum(abs(v) for v in combined.values()) if total > _MAX_TOTAL_DELTA: scale_factor = _MAX_TOTAL_DELTA / total combined = {k: v * scale_factor for k, v in combined.items()} if not combined: return channel_id = str(msg.channel_id) delta_summary = ", ".join( f"{k}:{v:+.3f}" for k, v in sorted( combined.items(), key=lambda x: abs(x[1]), reverse=True ) ) logger.info("Limbic Δ (rate-of-change): %s", delta_summary) state = await limbic.exhale( channel_id, combined, user_message=msg.text or "", star_reply=reply_text, user_id=msg.user_id, ) final_vector = state.get("vector", {}) if state else {} state_summary = ", ".join( f"{k}:{v:.3f}" for k, v in sorted( final_vector.items(), key=lambda x: x[1], reverse=True ) ) logger.info("Limbic state (post-exhale): %s", state_summary) # Publish updated state to limbic:exhale pub/sub channel # for the /ws/limbic WebSocket endpoint. Zero cost when # nobody is subscribed. 💀 if state and self.message_cache is not None: try: from limbic_system import LimbicSystem from star_avatar import classify_expression classified = LimbicSystem.classify_dominant_emotions( final_vector, top_n=5, ) emotion_names = [d["emotion"] for d in classified] expression = classify_expression(emotion_names) # Top 10 active chemicals active_chems = {} for k, v in sorted( final_vector.items(), key=lambda x: abs(x[1] - 0.5), reverse=True, )[:10]: if abs(v - 0.5) > 0.05: active_chems[k] = round(v, 3) meta = state.get("meta_state", {}) import json as _json # Read sprite state for the VN canvas sprite_raw = await self.message_cache.redis_client.get( "star:sprite:state" ) sprite_state = ( _json.loads(sprite_raw) if sprite_raw else None ) pubsub_payload = _json.dumps({ "channel_id": channel_id, "expression": expression, "dominant_emotions": [ {"emotion": d["emotion"], "score": d["score"]} for d in classified ], "active_chemicals": active_chems, "cascade_cues": meta.get("cascade_cues", []), "rdf": meta.get("rdf", {}), "user_read": meta.get("user_read", ""), "self_reflection": meta.get("self_reflection", {}), "last_tick": meta.get("last_tick", 0), "sprite_state": sprite_state, }) await self.message_cache.redis_client.publish( "limbic:exhale", pubsub_payload, ) except Exception: pass # non-critical, don't break the exhale trigger_names = ( [name for name, _ in user_matches] + [name for name, _ in bot_matches] ) logger.info( "Limbic exhale: %d triggers → %d delta keys on %s", len(trigger_names), len(combined), channel_id, ) except ImportError: pass except Exception: logger.debug("Limbic exhale failed", exc_info=True) # ------------------------------------------------------------------ # Feature toggle commands # ------------------------------------------------------------------ _TOGGLE_COMMANDS: dict[str, tuple[str, str]] = { "!emotions on": ("emotions", "on"), "!emotions off": ("emotions", "off"), "!rag on": ("rag", "on"), "!rag off": ("rag", "off"), } _FEATURE_LABELS: dict[str, str] = { "emotions": "Emotional simulation (limbic/NCM/cadence)", "rag": "RAG system (auto-search & tools)", } async def _handle_toggle_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!emotions on/off`` and ``!rag on/off``. Returns ``True`` if the message was a toggle command (so the caller can skip normal message processing), ``False`` otherwise. """ text = (msg.text or "").strip().lower() match = self._TOGGLE_COMMANDS.get(text) if match is None: return False feature, action = match label = self._FEATURE_LABELS.get(feature, feature) # Permission check if not feature_toggles.check_toggle_permission(msg, self.config): await platform.send( msg.channel_id, f"⛔ You don't have permission to toggle {label}. " f"Requires: bot admin, server/channel admin, or DM.", ) return True # Redis required redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — feature toggles are unavailable.", ) return True channel_key = f"{msg.platform}:{msg.channel_id}" disabled = action == "off" await feature_toggles.set_disabled(redis, feature, channel_key, disabled) state_emoji = "🔴 disabled" if disabled else "🟢 enabled" await platform.send( msg.channel_id, f"**{label}** is now {state_emoji} for this channel.", ) logger.info( "Feature toggle: %s %s=%s by %s in %s", feature, "disabled" if disabled else "enabled", channel_key, msg.user_id, msg.platform, ) return True # ------------------------------------------------------------------ # Batch handling # ------------------------------------------------------------------
[docs] async def handle_batch( self, batch: MessageBatch, representative: IncomingMessage, platform: PlatformAdapter, ) -> None: """Process a batch of rapid-succession messages as a single LLM call. Combines the text of every message in *batch* into one :class:`IncomingMessage` (using *representative* as the base) and delegates to :meth:`handle_message` with batch metadata set so that the system prompt includes the ``batch_response_context`` section. """ if batch.size == 1: original: IncomingMessage = batch.messages[0].extra.get( "incoming_message", representative, ) await self.handle_message(original, platform) return lines: list[str] = [] all_attachments: list[Attachment] = [] for qm in batch.messages: orig: IncomingMessage | None = qm.extra.get("incoming_message") if orig is not None: ts = orig.timestamp.isoformat() prefix = ( f"[{ts}] {orig.user_name} ({orig.user_id})" f" [Message ID: {orig.message_id}]" ) if orig.reply_to_id: prefix += f" [Replying to: {orig.reply_to_id}]" if orig.reactions: prefix += f" [Reactions: {orig.reactions}]" prefix += " : " if orig.text: lines.append(prefix + orig.text) all_attachments.extend(orig.attachments) else: lines.append(qm.text) combined_text = "\n".join(line for line in lines if line) # Inherit is_addressed from the batch: if ANY message was # addressed, the combined response should be too. Otherwise # the batch must go through the normal proactive gate. batch_addressed = any( qm.extra.get("incoming_message", representative).is_addressed for qm in batch.messages ) synthetic = IncomingMessage( platform=representative.platform, channel_id=representative.channel_id, user_id=representative.user_id, user_name=representative.user_name, text=combined_text, is_addressed=batch_addressed, attachments=all_attachments, channel_name=representative.channel_name, timestamp=representative.timestamp, message_id=representative.message_id, reply_to_id=representative.reply_to_id, extra={ **representative.extra, "is_batch_response": True, "batch_pre_formatted": True, "batch_size": batch.size, "batch_authors": batch.unique_authors(), }, ) logger.info( "Processing batch of %d messages on %s:%s", batch.size, synthetic.platform, synthetic.channel_id, ) await self.handle_message(synthetic, platform)
# ------------------------------------------------------------------ # Auto-disable proactive on 403 Forbidden # ------------------------------------------------------------------ async def _disable_proactive_on_forbidden( self, platform_name: str, channel_id: str, ) -> None: from .proactive_gates import run_disable_proactive_on_forbidden await run_disable_proactive_on_forbidden(self, platform_name, channel_id) # ------------------------------------------------------------------ # Proactive response evaluation # ------------------------------------------------------------------ async def _should_respond_proactively( self, msg: IncomingMessage, ) -> bool: from .proactive_gates import run_should_respond_proactively return await run_should_respond_proactively(self, msg) # ------------------------------------------------------------------ # History backfill # ------------------------------------------------------------------ async def _backfill_history( self, history_key: str, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: from .history_backfill import run_backfill_history await run_backfill_history(self, history_key, msg, platform) async def _backfill_from_redis( self, platform_name: str, channel_id: str, history_key: str, ) -> int: from .history_backfill import run_backfill_from_redis return await run_backfill_from_redis( self, platform_name, channel_id, history_key, ) async def _backfill_from_platform( self, msg: IncomingMessage, platform: PlatformAdapter, history_key: str, ) -> None: from .history_backfill import run_backfill_from_platform await run_backfill_from_platform(self, msg, platform, history_key) # ------------------------------------------------------------------ # Embedding helpers # ------------------------------------------------------------------ _EMBED_BATCH_SIZE = 50 # match embedding_queue.API_BATCH_LIMIT async def _ensure_embeddings( self, items: list[tuple[str, str]], ) -> None: """Ensure every ``(redis_key, text)`` pair gets an embedding. When the :class:`~embedding_queue.EmbeddingBatchQueue` is available the items are enqueued there (the queue handles batching internally). Otherwise, embeddings are generated directly via :meth:`OpenRouterClient.embed_batch` in chunks and written back to Redis in a pipeline. """ if not items: return if self._embedding_queue is not None: await self._embedding_queue.enqueue_many(items) return if self.message_cache is None: return redis = self.message_cache.redis_client for i in range(0, len(items), self._EMBED_BATCH_SIZE): chunk = items[i : i + self._EMBED_BATCH_SIZE] texts = [text for _, text in chunk] try: embeddings = await self.openrouter.embed_batch( texts, self.config.embedding_model, ) except Exception: logger.warning( "Direct batch embedding failed for %d items", len(chunk), exc_info=True, ) continue pipe = redis.pipeline() for (key, _), emb in zip(chunk, embeddings): blob = np.array(emb, dtype=np.float32).tobytes() pipe.hset(key, "embedding", blob) try: await pipe.execute() except Exception: logger.warning( "Failed to write %d embeddings to Redis", len(chunk), exc_info=True, ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod async def _build_multimodal_content( text: str, attachments: list[Attachment], ) -> list[dict[str, Any]]: """Convert text + attachments into OpenRouter content parts. Text-decodable attachments (source code, config files, plain text, etc.) are decoded and inlined directly into the text body so the LLM can read them regardless of proxy multimodal support. Binary attachments (images, PDFs, audio, video) are kept as multimodal content parts. """ text_body = text or "" multimodal_parts: list[dict[str, Any]] = [] for att in attachments: if _is_text_decodable(att.mimetype, att.filename, att.data): try: decoded = att.data.decode("utf-8") except UnicodeDecodeError: decoded = att.data.decode("latin-1") text_body += ( f"\n\n[Attached file: {att.filename}]\n" f"{decoded}\n" f"[End of file: {att.filename}]" ) logger.debug( "Inlined text file %s (%s, %d bytes) into message body", att.filename, att.mimetype, len(att.data), ) else: att_parts = await media_to_content_parts( att.data, att.mimetype, att.filename, ) multimodal_parts.extend(att_parts) logger.debug( "Kept %s (%s, %d bytes) as %d multimodal part(s)", att.filename, att.mimetype, len(att.data), len(att_parts), ) parts: list[dict[str, Any]] = [] if text_body: parts.append({"type": "text", "text": text_body}) parts.extend(multimodal_parts) return parts if parts else [{"type": "text", "text": ""}] async def _download_and_patch_video( self, url: str, user_id: str, history_key: str, message_id: str, metadata: dict[str, Any], cookies_text: str | None = None, downloading_annotation: str = "", ) -> None: from .video_history_patch import run_download_and_patch_video await run_download_and_patch_video( self, url, user_id, history_key, message_id, metadata, cookies_text=cookies_text, downloading_annotation=downloading_annotation, ) async def _extract_knowledge_from_message( self, msg: IncomingMessage, ) -> None: """Fire-and-forget per-message knowledge extraction. Three cheap gates run *before* any LLM call: length, heuristic regex, and per-user rate limit. """ try: from kg_extraction import extract_from_message redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) await extract_from_message( message_text=msg.text, user_id=msg.user_id, user_name=msg.user_name, channel_id=msg.channel_id, guild_id=msg.extra.get("guild_id") or None, openrouter=self.openrouter, kg_manager=self.kg_manager, redis=redis, per_user_limit=( self.config.kg_per_user_extraction_limit ), ) except Exception: logger.warning( "Per-message knowledge extraction failed", exc_info=True, )