"""Central message processor shared across all platforms.
Receives :class:`~platforms.base.IncomingMessage` instances from any
:class:`~platforms.base.PlatformAdapter`, manages conversation history,
calls the LLM via :class:`~openrouter_client.OpenRouterClient`, and
sends the reply back through the originating platform.
"""
from __future__ import annotations
import asyncio
import json
import logging
import os
import random
import time
import uuid
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable
import numpy as np
from config import Config
from conversation import ConversationManager
from message_queue import MessageBatch
from openrouter_client import OpenRouterClient
from platforms.base import Attachment, IncomingMessage, PlatformAdapter
from platforms.media_common import media_to_content_parts
from prompt_context import PromptContextBuilder
from response_postprocessor import (
extract_status_tags,
llm_filter_response,
postprocess_intermediate_response,
postprocess_response,
)
import feature_toggles
from tool_context import ToolContext
from url_content_extractor import extract_all_url_content
from .assistant_history import _assistant_history_content_with_sent_files
from .population_gate import _POPULATION_EXEMPT_GUILDS, _POPULATION_LIMIT
from .text_attachments import _is_text_decodable
from .user_message_format import format_user_content
if TYPE_CHECKING:
from classifiers.vector_classifier import VectorClassifier
from embedding_queue import EmbeddingBatchQueue
from knowledge_graph import KnowledgeGraphManager
from message_cache import MessageCache
from rag_system.auto_search import RAGAutoSearchManager
from status_manager import StatusManager
from task_manager import TaskManager
from threadweave import ThreadweaveManager
from web_search_context import WebSearchContextManager
logger = logging.getLogger(__name__)
[docs]
class MessageProcessor:
"""Platform-agnostic message handler.
Parameters
----------
config:
Global bot configuration (forwarded to the context builder).
conversation_manager:
Manages per-channel conversation histories.
openrouter:
The OpenRouter API client for LLM completions.
message_cache:
Optional Redis-backed message cache. When provided, every
message the bot sees is logged with an embedding vector.
"""
[docs]
def __init__(
self,
config: Config,
conversation_manager: ConversationManager,
openrouter: OpenRouterClient,
message_cache: MessageCache | None = None,
kg_manager: KnowledgeGraphManager | None = None,
auto_search: RAGAutoSearchManager | None = None,
task_manager: TaskManager | None = None,
classifier: VectorClassifier | None = None,
threadweave: ThreadweaveManager | None = None,
embedding_queue: EmbeddingBatchQueue | None = None,
status_manager: StatusManager | None = None,
web_search: WebSearchContextManager | None = None,
) -> None:
"""Initialize the instance.
Args:
config (Config): Bot configuration object.
conversation_manager (ConversationManager): The conversation manager value.
openrouter (OpenRouterClient): The openrouter value.
message_cache (MessageCache | None): The message cache value.
kg_manager (KnowledgeGraphManager | None): The kg manager value.
auto_search (RAGAutoSearchManager | None): The auto search value.
task_manager (TaskManager | None): The task manager value.
classifier (VectorClassifier | None): The classifier value.
threadweave (ThreadweaveManager | None): The threadweave value.
embedding_queue (EmbeddingBatchQueue | None): The embedding queue value.
status_manager (StatusManager | None): The status manager value.
web_search (WebSearchContextManager | None): The web search value.
"""
self.config = config
self.conversation = conversation_manager
self.openrouter = openrouter
self.message_cache = message_cache
self.kg_manager = kg_manager
self._auto_search = auto_search
self._task_manager = task_manager
self._classifier = classifier
self._threadweave = threadweave
self._embedding_queue = embedding_queue
self._status_manager = status_manager
self._web_search = web_search
self._backfilled_channels: set[str] = set()
self._ctx_builder = PromptContextBuilder(
config, kg_manager=kg_manager,
threadweave_manager=threadweave,
status_manager=status_manager,
message_cache=message_cache,
task_manager=task_manager,
conversation_manager=conversation_manager,
openrouter_client=self.openrouter,
)
# 💀 Cadence Post-Processor — code-level text degradation
self._cadence_processor = None
try:
from cadence_refiner import CadencePostProcessor
self._cadence_processor = CadencePostProcessor()
except ImportError:
logger.debug("cadence_refiner not available")
# ------------------------------------------------------------------
# Pending-response tracking (Redis)
# ------------------------------------------------------------------
_PENDING_PREFIX = "pending_response:"
_PENDING_TTL = 3600 # 1 hour safety net
async def _mark_pending(self, msg: IncomingMessage) -> None:
"""Write a Redis marker for an in-flight response.
The marker is removed by :meth:`_clear_pending` once the reply
has been sent. If the bot crashes in between, the marker
survives and is picked up by :meth:`recover_pending_responses`
on the next startup.
"""
if self.message_cache is None:
return
key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}"
mapping = {
"platform": msg.platform,
"channel_id": msg.channel_id,
"user_id": msg.user_id,
"user_name": msg.user_name,
"text": msg.text or "",
"message_id": msg.message_id or "",
"reply_to_id": msg.reply_to_id or "",
"channel_name": msg.channel_name or "",
"is_addressed": "1" if msg.is_addressed else "0",
"timestamp": msg.timestamp.isoformat(),
"started_at": datetime.now(timezone.utc).isoformat(),
}
try:
pipe = self.message_cache.redis_client.pipeline()
pipe.hset(key, mapping=mapping)
pipe.expire(key, self._PENDING_TTL)
await pipe.execute()
except Exception:
logger.debug("Failed to mark pending response", exc_info=True)
async def _clear_pending(self, msg: IncomingMessage) -> None:
"""Remove the in-flight marker after a response completes."""
if self.message_cache is None:
return
key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}"
try:
await self.message_cache.redis_client.delete(key)
except Exception:
logger.debug("Failed to clear pending response", exc_info=True)
[docs]
async def recover_pending_responses(
self,
enqueue_callback: Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]],
adapters: list[PlatformAdapter],
) -> int:
"""Re-enqueue messages whose responses were interrupted by a restart.
Called once during startup after all platform adapters are
running. Returns the number of recovered messages.
"""
if self.message_cache is None:
return 0
adapter_map: dict[str, PlatformAdapter] = {
a.name: a for a in adapters
}
redis = self.message_cache.redis_client
recovered = 0
try:
cursor: int | bytes = 0
keys: list[bytes | str] = []
while True:
cursor, batch = await redis.scan(
cursor, match=f"{self._PENDING_PREFIX}*", count=100,
)
keys.extend(batch)
if not cursor:
break
except Exception:
logger.warning(
"Failed to scan for pending responses", exc_info=True,
)
return 0
for raw_key in keys:
key = raw_key.decode() if isinstance(raw_key, bytes) else raw_key
try:
data = await redis.hgetall(key)
if not data:
await redis.delete(key)
continue
def _s(field: str) -> str:
v = data.get(field, b"")
return v.decode() if isinstance(v, bytes) else (v or "")
platform_name = _s("platform")
adapter = adapter_map.get(platform_name)
if adapter is None:
logger.warning(
"No adapter for pending response platform %r — deleting key %s",
platform_name, key,
)
await redis.delete(key)
continue
ts_raw = _s("timestamp")
try:
ts = datetime.fromisoformat(ts_raw)
except (ValueError, TypeError):
ts = datetime.now(timezone.utc)
msg = IncomingMessage(
platform=platform_name,
channel_id=_s("channel_id"),
user_id=_s("user_id"),
user_name=_s("user_name"),
text=_s("text"),
is_addressed=_s("is_addressed") == "1",
channel_name=_s("channel_name"),
timestamp=ts,
message_id=_s("message_id"),
reply_to_id=_s("reply_to_id"),
extra={"is_recovery": True},
)
await redis.delete(key)
await enqueue_callback(msg, adapter)
recovered += 1
logger.info(
"Recovered pending response for %s:%s (user=%s)",
platform_name, msg.channel_id, msg.user_name,
)
except Exception:
logger.warning(
"Failed to recover pending response from %s",
key, exc_info=True,
)
try:
await redis.delete(key)
except Exception:
pass
if recovered:
logger.info("Recovered %d pending response(s) from previous run", recovered)
return recovered
# ------------------------------------------------------------------
# Public handler -- called by every platform adapter
# ------------------------------------------------------------------
[docs]
async def handle_message(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> None:
"""Process an incoming message from any platform.
1. Handle special commands (e.g. ``!clear``).
2. Skip if the bot was not addressed.
3. Build conversation context and call the LLM.
4. Send the reply back via the originating *platform*.
"""
# ── Restricted Access Protocol — population-based kill switch ──
_member_count = msg.extra.get("member_count", 0) or 0
_guild_id = str(msg.extra.get("guild_id", "") or "")
if _member_count > _POPULATION_LIMIT and _guild_id not in _POPULATION_EXEMPT_GUILDS:
logger.warning(
"🚨 RESTRICTED ACCESS PROTOCOL: environment '%s' has %d "
"members (limit: %d) — classified as hostile, refusing "
"to operate",
msg.extra.get("guild_name") or msg.channel_name,
_member_count,
_POPULATION_LIMIT,
)
return
# Composite key keeps histories separate per platform + channel
history_key = f"{msg.platform}:{msg.channel_id}"
# --- Backfill conversation history on first contact --------------
# Overlap backfill with embedding when both are needed (saves 100-500 ms
# on first-contact messages).
needs_backfill = (
history_key not in self._backfilled_channels
or history_key in self.conversation._rebackfill_requested
)
if history_key in self.conversation._rebackfill_requested:
self.conversation._rebackfill_requested.discard(history_key)
self.conversation.clear(history_key)
self._backfilled_channels.discard(history_key)
query_embedding: list[float] | None = None
defer_embedding = not msg.is_addressed and self._embedding_queue is not None
needs_embed = bool(msg.text and not defer_embedding)
async def _do_backfill() -> None:
await self._backfill_history(history_key, msg, platform)
async def _do_embed() -> list[float] | None:
try:
_t0 = time.monotonic()
emb = await self.openrouter.embed(
msg.text, self.config.embedding_model,
)
logger.info(
"Query embedding completed in %.0f ms",
(time.monotonic() - _t0) * 1000,
)
return emb
except Exception:
logger.warning(
"Failed to generate shared query embedding",
exc_info=True,
)
return None
if needs_backfill and needs_embed:
_, query_embedding = await asyncio.gather(
_do_backfill(), _do_embed(),
)
elif needs_backfill:
await _do_backfill()
elif needs_embed:
query_embedding = await _do_embed()
# --- Start preprocessing early (runs concurrently with gather) -----
# For addressed messages, kick off preprocessing NOW so it
# overlaps with classify/context/RAG/web in _generate_and_send.
# The task is awaited inside _generate_and_send (before get_messages)
# so order is preserved. For unaddressed messages, fire-and-forget.
preprocess_task: asyncio.Task | None = None
if msg.is_addressed:
preprocess_task = asyncio.create_task(
self._preprocess_and_record(msg, history_key),
)
else:
asyncio.create_task(self._preprocess_and_record(msg, history_key))
# --- Log every incoming user message to Redis -----------------
if self.message_cache is not None:
try:
cached = await self.message_cache.log_message(
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
user_name=msg.user_name,
text=msg.text,
embedding=query_embedding,
defer_embedding=defer_embedding,
message_id=msg.message_id,
reply_to_id=msg.reply_to_id,
)
if defer_embedding and cached.message_key and msg.text:
await self._embedding_queue.enqueue(cached.message_key, msg.text)
except Exception:
logger.warning(
"Failed to cache incoming message (embedding may have failed)",
exc_info=True,
)
# --- Special commands (admin only) ----------------------------
if msg.text.strip().lower() == "!clear":
if msg.user_id not in self.config.admin_user_ids:
await platform.send(
msg.channel_id,
"Only admins can clear conversation history.",
)
return
self.conversation.clear(history_key)
await platform.send(
msg.channel_id,
"Conversation history cleared.",
)
return
# --- Feature toggle commands (!emotions on/off, !rag on/off) --
_toggle_result = await self._handle_toggle_command(msg, platform)
if _toggle_result:
return
# --- Proactive response evaluation ----------------------------
if not msg.is_addressed:
logger.debug(
"[proactive/%s:%s] msg not addressed, entering proactive gate "
"(user=%s, text=%r)",
msg.platform, msg.channel_id, msg.user_name,
(msg.text or "")[:80],
)
should_respond = await self._should_respond_proactively(msg)
if not should_respond:
return
msg.extra["is_proactive"] = True
else:
logger.debug(
"[proactive/%s:%s] msg IS addressed — skipping proactive gate "
"(user=%s)",
msg.platform, msg.channel_id, msg.user_name,
)
# --- Mark in-flight so recovery can re-process after a crash ---
if not msg.extra.get("is_proactive"):
await self._mark_pending(msg)
# --- Show typing indicator while processing --------------------
await platform.start_typing(msg.channel_id)
try:
await self._generate_and_send(
msg, platform, history_key, query_embedding,
preprocess_task=preprocess_task,
)
finally:
await platform.stop_typing(msg.channel_id)
await self._clear_pending(msg)
[docs]
async def handle_message_update(
self,
platform_name: str,
channel_id: str,
message_id: str,
user_name: str,
user_id: str,
new_text: str,
timestamp_iso: str,
reply_to_id: str = "",
) -> None:
"""Silently update an already-cached message (no LLM response).
Called when Discord delivers an embed-only or bot-message edit
that should refresh the cached content without triggering a new
response cycle.
"""
history_key = f"{platform_name}:{channel_id}"
formatted = (
f"[{timestamp_iso}] {user_name} ({user_id})"
f" [Message ID: {message_id}]"
)
if reply_to_id:
formatted += f" [Replying to: {reply_to_id}]"
formatted += f" : {new_text}"
self.conversation.update_message(history_key, message_id, formatted)
if self.message_cache is not None:
try:
redis_key = await self.message_cache.update_text_by_message_id(
platform_name, channel_id, message_id, new_text,
)
# Re-embed since the text changed
if redis_key and new_text and new_text.strip():
if self._embedding_queue is not None:
await self._embedding_queue.enqueue(redis_key, new_text)
else:
try:
emb = await self.openrouter.embed(
new_text, self.config.embedding_model,
)
import numpy as np
blob = np.array(emb, dtype=np.float32).tobytes()
await self.message_cache.redis_client.hset(
redis_key, "embedding", blob,
)
except Exception:
logger.debug(
"Failed to re-embed edited message %s",
message_id, exc_info=True,
)
except Exception:
logger.debug(
"Failed to update cached message %s", message_id,
exc_info=True,
)
[docs]
async def handle_message_delete(
self,
platform_name: str,
channel_id: str,
message_id: str,
deleted_at_iso: str,
) -> None:
"""Mark a message as deleted in both cache and conversation history.
The original content is preserved with a ``[deleted at TIMESTAMP]``
tag so the bot retains context about what was said.
"""
history_key = f"{platform_name}:{channel_id}"
self.conversation.mark_deleted(history_key, message_id, deleted_at_iso)
if self.message_cache is not None:
try:
await self.message_cache.mark_deleted_by_message_id(
platform_name, channel_id, message_id, deleted_at_iso,
)
except Exception:
logger.debug(
"Failed to mark cached message %s as deleted",
message_id, exc_info=True,
)
[docs]
async def handle_reaction_update(
self,
platform_name: str,
channel_id: str,
message_id: str,
reactions_str: str,
) -> None:
"""Patch the ``[Reactions: ...]`` tag on an existing history entry.
Called by platform adapters when a reaction is added or removed
so the LLM context window always reflects the latest reactions.
"""
history_key = f"{platform_name}:{channel_id}"
marker = f"[Message ID: {message_id}]"
history = self.conversation._histories.get(history_key)
if not history:
return
import re
reactions_pattern = re.compile(r" \[Reactions: [^\]]*\]")
for entry in reversed(history):
content = entry["content"]
if isinstance(content, str):
if marker in content:
# Strip any existing [Reactions: ...] tag
content = reactions_pattern.sub("", content)
if reactions_str:
# Insert new tag right after the marker
content = content.replace(
marker,
f"{marker} [Reactions: {reactions_str}]",
1,
)
entry["content"] = content
return
elif isinstance(content, list):
for part in content:
text = part.get("text", "")
if part.get("type") == "text" and marker in text:
text = reactions_pattern.sub("", text)
if reactions_str:
text = text.replace(
marker,
f"{marker} [Reactions: {reactions_str}]",
1,
)
part["text"] = text
return
async def _preprocess_and_record(
self,
msg: IncomingMessage,
history_key: str,
) -> None:
"""Run the full message preprocessing pipeline and append to history.
Runs URL content extraction, YouTube video parts, multimodal
attachment handling, and metadata formatting — then appends the
result to :attr:`conversation`. Called for **every** incoming
message so the bot's context matches what a platform user sees.
For unaddressed messages this is launched as a fire-and-forget
``asyncio.create_task`` so the event loop is never blocked.
"""
try:
url_context = ""
multimodal_parts: list[dict[str, Any]] = []
download_requests: list[dict[str, Any]] = []
if msg.text:
try:
_redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
url_context, multimodal_parts, download_requests = (
await extract_all_url_content(
msg.text,
user_id=msg.user_id,
redis_client=_redis,
config=self.config,
)
)
except Exception:
logger.debug(
"URL content extraction failed for message",
exc_info=True,
)
content: str | list[dict[str, Any]]
if msg.attachments:
content = await self._build_multimodal_content(
msg.text, msg.attachments,
)
part_types = [p.get("type") for p in content]
logger.info(
"Built content for %d attachment(s): %s",
len(msg.attachments), part_types,
)
else:
content = msg.text
formatted = format_user_content(msg, content)
if url_context:
if isinstance(formatted, str):
formatted += url_context
else:
for part in formatted:
if part.get("type") == "text":
part["text"] += url_context
break
# Merge detected media URLs (images + cached videos) as
# multimodal parts.
if multimodal_parts:
if isinstance(formatted, str):
formatted = [
{"type": "text", "text": formatted},
] + multimodal_parts
else:
formatted.extend(multimodal_parts)
logger.info(
"Promoted %d media URL(s) to multimodal input",
len(multimodal_parts),
)
self.conversation.append(history_key, "user", formatted)
# Spawn background download tasks for videos that need fetching.
for req in download_requests:
asyncio.create_task(
self._download_and_patch_video(
url=req["url"],
user_id=msg.user_id,
history_key=history_key,
message_id=msg.message_id,
metadata=req["metadata"],
cookies_text=req.get("cookies_text"),
downloading_annotation=req["downloading_annotation"],
),
)
logger.info(
"Spawned background video download for %s", req["url"],
)
except Exception:
logger.exception(
"Failed to preprocess/record message in %s", history_key,
)
async def _generate_and_send(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
history_key: str,
query_embedding: list[float] | None,
preprocess_task: asyncio.Task | None = None,
) -> None:
from .generate_and_send import run_generate_and_send
await run_generate_and_send(
self, msg, platform, history_key, query_embedding, preprocess_task,
)
# ------------------------------------------------------------------
# Memory-linked metadata resolution (rag_stores + linked_files)
# ------------------------------------------------------------------
async def _resolve_memory_metadata_links(
self,
room_context: dict[str, Any] | None,
msg: IncomingMessage,
query_embedding: list[float] | None,
) -> str | None:
from .memory_linked_context import run_resolve_memory_metadata_links
return await run_resolve_memory_metadata_links(
self, room_context, msg, query_embedding,
)
async def _resolve_rag_for_memory(
self,
store_names: list[str],
conv_query: str,
entity_description: str,
query_embedding: list[float] | None,
) -> str | None:
from .memory_linked_context import run_resolve_rag_for_memory
return await run_resolve_rag_for_memory(
self, store_names, conv_query, entity_description, query_embedding,
)
async def _resolve_linked_files(
self,
file_paths: list[str],
remaining_budget: int,
) -> tuple[str | None, int]:
from .memory_linked_context import run_resolve_linked_files
return await run_resolve_linked_files(
self, file_paths, remaining_budget,
)
_LINKED_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB safety ceiling
@staticmethod
def _read_linked_file(path: "Path", max_bytes: int = 0) -> str:
from .memory_linked_context import read_linked_file
return read_linked_file(path, max_bytes)
# ------------------------------------------------------------------
# GameGirl Color button-aware send # 🎮🌀
# ------------------------------------------------------------------
async def _send_reply_maybe_with_buttons(
self,
platform: PlatformAdapter,
channel_id: str,
reply: str,
) -> str:
"""Send a reply, attaching game buttons if a session is active.
When a GameGirl Color session is running in this channel,
the reply is parsed for emoji-prefixed choices which become
interactive Discord buttons via ``discord.ui.View``.
Otherwise falls back to plain ``platform.send()``.
"""
try:
from game_session import get_session
session = get_session(channel_id)
except ImportError:
session = None
if session is not None and session.active:
try:
from game_renderer import build_game_view
narrative, view = build_game_view(reply, channel_id)
if view is not None:
logger.info(
"Sending game turn with buttons to %s (game: %s)",
channel_id, session.game_name,
)
return await platform.send_with_buttons(
channel_id, narrative, view=view,
)
except ImportError:
logger.debug("game_renderer not available, sending plain text")
except Exception:
logger.exception("Game renderer failed, falling back to plain send")
return await platform.send(channel_id, reply)
# ------------------------------------------------------------------
# Response regeneration (postprocessing rejection -> retry)
# ------------------------------------------------------------------
async def _trigger_regen(
self,
messages: list[dict[str, Any]],
rejected_reply: str,
rejection_reason: str,
*,
user_id: str = "",
tool_names: list[str] | None = None,
) -> str:
"""Regenerate a response after a postprocessing rejection.
Appends the *rejected_reply* as an assistant message followed by
a system message that explains **why** it was rejected and asks
the model to revise. The LLM is then called again (with no
tools by default) so it can produce a corrected response.
Parameters
----------
messages:
The conversation messages list used for the original call.
This list is **mutated** — the rejected reply and the
correction hint are appended in-place.
rejected_reply:
The full text of the assistant reply that was rejected.
rejection_reason:
A human-readable explanation of why the reply was rejected.
This is shown to the model inside the correction system
message so it understands what to fix.
user_id:
Forwarded to the LLM API for rate-limit / identity tracking.
tool_names:
Tool names to expose on the retry call. Defaults to an
empty list (no tools) so the model focuses on text output.
Returns
-------
str
The replacement response text. Falls back to
*rejected_reply* if the retry itself fails.
"""
logger.info(
"Triggering regen — reason: %s (rejected %d chars)",
rejection_reason,
len(rejected_reply),
)
# Show the model what it produced …
messages.append({"role": "assistant", "content": rejected_reply})
# … and explain why it was rejected.
messages.append({
"role": "user",
"content": (
"[ SYSTEM MESSAGE ] Your previous response was rejected during postprocessing.\n\n"
f"Reason: {rejection_reason}\n\n"
"Please revise your response to address the issue above. "
"Do NOT apologize for the error or mention that you are "
"revising — just produce the corrected response as if it "
"were your first attempt. Review the system prompt and make sure the response is compliant with it."
),
})
try:
_t0 = time.monotonic()
retry_reply = await self.openrouter.chat(
messages,
user_id=user_id,
tool_names=tool_names if tool_names is not None else [],
)
logger.info(
"Regen completed in %.0f ms (%d chars)",
(time.monotonic() - _t0) * 1000,
len(retry_reply),
)
return retry_reply or rejected_reply
except Exception:
logger.exception("Regen LLM call failed — using original reply")
return rejected_reply
# ------------------------------------------------------------------
# Limbic exhale (post-response recursive feedback)
# ------------------------------------------------------------------
async def _limbic_exhale(
self,
msg: IncomingMessage,
reply_text: str,
) -> None:
"""Scan user + bot texts for emotional triggers and update the shard.
This is the second half of the respiration loop:
- Scan both texts for trigger words
- Parse matched emotions → delta vectors
- Combine with sqrt(N) dampening
- Call exhale() to update the shard + pulse global heart
Rate-limited to once per 3 seconds per channel to prevent
multiplicative delta stacking in busy multi-user channels.
"""
try:
from ncm_delta_parser import combine_deltas
limbic = self._ctx_builder._limbic
if limbic is None:
return
# Exhale throttle: max once per 3s per channel
import time as _time
channel_id = str(msg.channel_id)
if not hasattr(self, '_last_exhale_time'):
self._last_exhale_time: dict[str, float] = {}
last = self._last_exhale_time.get(channel_id, 0.0)
if (_time.time() - last) < 3.0:
return
self._last_exhale_time[channel_id] = _time.time()
# Scan BOTH user input and bot response via semantic matcher
# (falls back to exact-word scan_text_for_triggers internally)
user_matches = await limbic.scan_triggers(msg.text or "")
bot_matches = await limbic.scan_triggers(reply_text)
if not user_matches and not bot_matches:
return
# Weighted recursion: user input at full strength (external stimulus),
# bot output at 0.1x (self-modulation without self-amplification).
# Reduced from 0.3x to 0.1x to prevent the model-in-the-loop
# feedback cycle where manic output → self-scan → more mania.
user_deltas = [delta for _, delta in user_matches]
bot_deltas = [delta for _, delta in bot_matches]
user_combined = combine_deltas(user_deltas, scale=0.4) if user_deltas else {}
bot_combined = combine_deltas(bot_deltas, scale=0.1) if bot_deltas else {}
# Merge: user deltas + dampened bot deltas
combined: dict[str, float] = {}
for k in set(user_combined) | set(bot_combined):
combined[k] = user_combined.get(k, 0.0) + bot_combined.get(k, 0.0)
combined[k] = max(-0.25, min(0.25, combined[k]))
# Global per-turn magnitude cap: prevents many small deltas
# from stacking into a catastrophic total shift.
_MAX_TOTAL_DELTA = 0.6
total = sum(abs(v) for v in combined.values())
if total > _MAX_TOTAL_DELTA:
scale_factor = _MAX_TOTAL_DELTA / total
combined = {k: v * scale_factor for k, v in combined.items()}
if not combined:
return
channel_id = str(msg.channel_id)
delta_summary = ", ".join(
f"{k}:{v:+.3f}"
for k, v in sorted(
combined.items(), key=lambda x: abs(x[1]), reverse=True
)
)
logger.info("Limbic Δ (rate-of-change): %s", delta_summary)
state = await limbic.exhale(
channel_id, combined,
user_message=msg.text or "",
star_reply=reply_text,
user_id=msg.user_id,
)
final_vector = state.get("vector", {}) if state else {}
state_summary = ", ".join(
f"{k}:{v:.3f}"
for k, v in sorted(
final_vector.items(), key=lambda x: x[1], reverse=True
)
)
logger.info("Limbic state (post-exhale): %s", state_summary)
# Publish updated state to limbic:exhale pub/sub channel
# for the /ws/limbic WebSocket endpoint. Zero cost when
# nobody is subscribed. 💀
if state and self.message_cache is not None:
try:
from limbic_system import LimbicSystem
from star_avatar import classify_expression
classified = LimbicSystem.classify_dominant_emotions(
final_vector, top_n=5,
)
emotion_names = [d["emotion"] for d in classified]
expression = classify_expression(emotion_names)
# Top 10 active chemicals
active_chems = {}
for k, v in sorted(
final_vector.items(),
key=lambda x: abs(x[1] - 0.5),
reverse=True,
)[:10]:
if abs(v - 0.5) > 0.05:
active_chems[k] = round(v, 3)
meta = state.get("meta_state", {})
import json as _json
# Read sprite state for the VN canvas
sprite_raw = await self.message_cache.redis_client.get(
"star:sprite:state"
)
sprite_state = (
_json.loads(sprite_raw) if sprite_raw else None
)
pubsub_payload = _json.dumps({
"channel_id": channel_id,
"expression": expression,
"dominant_emotions": [
{"emotion": d["emotion"], "score": d["score"]}
for d in classified
],
"active_chemicals": active_chems,
"cascade_cues": meta.get("cascade_cues", []),
"rdf": meta.get("rdf", {}),
"user_read": meta.get("user_read", ""),
"self_reflection": meta.get("self_reflection", {}),
"last_tick": meta.get("last_tick", 0),
"sprite_state": sprite_state,
})
await self.message_cache.redis_client.publish(
"limbic:exhale", pubsub_payload,
)
except Exception:
pass # non-critical, don't break the exhale
trigger_names = (
[name for name, _ in user_matches]
+ [name for name, _ in bot_matches]
)
logger.info(
"Limbic exhale: %d triggers → %d delta keys on %s",
len(trigger_names), len(combined), channel_id,
)
except ImportError:
pass
except Exception:
logger.debug("Limbic exhale failed", exc_info=True)
# ------------------------------------------------------------------
# Feature toggle commands
# ------------------------------------------------------------------
_TOGGLE_COMMANDS: dict[str, tuple[str, str]] = {
"!emotions on": ("emotions", "on"),
"!emotions off": ("emotions", "off"),
"!rag on": ("rag", "on"),
"!rag off": ("rag", "off"),
}
_FEATURE_LABELS: dict[str, str] = {
"emotions": "Emotional simulation (limbic/NCM/cadence)",
"rag": "RAG system (auto-search & tools)",
}
async def _handle_toggle_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!emotions on/off`` and ``!rag on/off``.
Returns ``True`` if the message was a toggle command (so the
caller can skip normal message processing), ``False`` otherwise.
"""
text = (msg.text or "").strip().lower()
match = self._TOGGLE_COMMANDS.get(text)
if match is None:
return False
feature, action = match
label = self._FEATURE_LABELS.get(feature, feature)
# Permission check
if not feature_toggles.check_toggle_permission(msg, self.config):
await platform.send(
msg.channel_id,
f"⛔ You don't have permission to toggle {label}. "
f"Requires: bot admin, server/channel admin, or DM.",
)
return True
# Redis required
redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — feature toggles are unavailable.",
)
return True
channel_key = f"{msg.platform}:{msg.channel_id}"
disabled = action == "off"
await feature_toggles.set_disabled(redis, feature, channel_key, disabled)
state_emoji = "🔴 disabled" if disabled else "🟢 enabled"
await platform.send(
msg.channel_id,
f"**{label}** is now {state_emoji} for this channel.",
)
logger.info(
"Feature toggle: %s %s=%s by %s in %s",
feature, "disabled" if disabled else "enabled",
channel_key, msg.user_id, msg.platform,
)
return True
# ------------------------------------------------------------------
# Batch handling
# ------------------------------------------------------------------
[docs]
async def handle_batch(
self,
batch: MessageBatch,
representative: IncomingMessage,
platform: PlatformAdapter,
) -> None:
"""Process a batch of rapid-succession messages as a single LLM call.
Combines the text of every message in *batch* into one
:class:`IncomingMessage` (using *representative* as the base)
and delegates to :meth:`handle_message` with batch metadata set
so that the system prompt includes the ``batch_response_context``
section.
"""
if batch.size == 1:
original: IncomingMessage = batch.messages[0].extra.get(
"incoming_message", representative,
)
await self.handle_message(original, platform)
return
lines: list[str] = []
all_attachments: list[Attachment] = []
for qm in batch.messages:
orig: IncomingMessage | None = qm.extra.get("incoming_message")
if orig is not None:
ts = orig.timestamp.isoformat()
prefix = (
f"[{ts}] {orig.user_name} ({orig.user_id})"
f" [Message ID: {orig.message_id}]"
)
if orig.reply_to_id:
prefix += f" [Replying to: {orig.reply_to_id}]"
if orig.reactions:
prefix += f" [Reactions: {orig.reactions}]"
prefix += " : "
if orig.text:
lines.append(prefix + orig.text)
all_attachments.extend(orig.attachments)
else:
lines.append(qm.text)
combined_text = "\n".join(line for line in lines if line)
# Inherit is_addressed from the batch: if ANY message was
# addressed, the combined response should be too. Otherwise
# the batch must go through the normal proactive gate.
batch_addressed = any(
qm.extra.get("incoming_message", representative).is_addressed
for qm in batch.messages
)
synthetic = IncomingMessage(
platform=representative.platform,
channel_id=representative.channel_id,
user_id=representative.user_id,
user_name=representative.user_name,
text=combined_text,
is_addressed=batch_addressed,
attachments=all_attachments,
channel_name=representative.channel_name,
timestamp=representative.timestamp,
message_id=representative.message_id,
reply_to_id=representative.reply_to_id,
extra={
**representative.extra,
"is_batch_response": True,
"batch_pre_formatted": True,
"batch_size": batch.size,
"batch_authors": batch.unique_authors(),
},
)
logger.info(
"Processing batch of %d messages on %s:%s",
batch.size, synthetic.platform, synthetic.channel_id,
)
await self.handle_message(synthetic, platform)
# ------------------------------------------------------------------
# Auto-disable proactive on 403 Forbidden
# ------------------------------------------------------------------
async def _disable_proactive_on_forbidden(
self, platform_name: str, channel_id: str,
) -> None:
from .proactive_gates import run_disable_proactive_on_forbidden
await run_disable_proactive_on_forbidden(self, platform_name, channel_id)
# ------------------------------------------------------------------
# Proactive response evaluation
# ------------------------------------------------------------------
async def _should_respond_proactively(
self, msg: IncomingMessage,
) -> bool:
from .proactive_gates import run_should_respond_proactively
return await run_should_respond_proactively(self, msg)
# ------------------------------------------------------------------
# History backfill
# ------------------------------------------------------------------
async def _backfill_history(
self,
history_key: str,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> None:
from .history_backfill import run_backfill_history
await run_backfill_history(self, history_key, msg, platform)
async def _backfill_from_redis(
self,
platform_name: str,
channel_id: str,
history_key: str,
) -> int:
from .history_backfill import run_backfill_from_redis
return await run_backfill_from_redis(
self, platform_name, channel_id, history_key,
)
async def _backfill_from_platform(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
history_key: str,
) -> None:
from .history_backfill import run_backfill_from_platform
await run_backfill_from_platform(self, msg, platform, history_key)
# ------------------------------------------------------------------
# Embedding helpers
# ------------------------------------------------------------------
_EMBED_BATCH_SIZE = 50 # match embedding_queue.API_BATCH_LIMIT
async def _ensure_embeddings(
self,
items: list[tuple[str, str]],
) -> None:
"""Ensure every ``(redis_key, text)`` pair gets an embedding.
When the :class:`~embedding_queue.EmbeddingBatchQueue` is
available the items are enqueued there (the queue handles
batching internally). Otherwise, embeddings are generated
directly via :meth:`OpenRouterClient.embed_batch` in chunks
and written back to Redis in a pipeline.
"""
if not items:
return
if self._embedding_queue is not None:
await self._embedding_queue.enqueue_many(items)
return
if self.message_cache is None:
return
redis = self.message_cache.redis_client
for i in range(0, len(items), self._EMBED_BATCH_SIZE):
chunk = items[i : i + self._EMBED_BATCH_SIZE]
texts = [text for _, text in chunk]
try:
embeddings = await self.openrouter.embed_batch(
texts, self.config.embedding_model,
)
except Exception:
logger.warning(
"Direct batch embedding failed for %d items",
len(chunk), exc_info=True,
)
continue
pipe = redis.pipeline()
for (key, _), emb in zip(chunk, embeddings):
blob = np.array(emb, dtype=np.float32).tobytes()
pipe.hset(key, "embedding", blob)
try:
await pipe.execute()
except Exception:
logger.warning(
"Failed to write %d embeddings to Redis",
len(chunk), exc_info=True,
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
async def _build_multimodal_content(
text: str,
attachments: list[Attachment],
) -> list[dict[str, Any]]:
"""Convert text + attachments into OpenRouter content parts.
Text-decodable attachments (source code, config files, plain text,
etc.) are decoded and inlined directly into the text body so the
LLM can read them regardless of proxy multimodal support. Binary
attachments (images, PDFs, audio, video) are kept as multimodal
content parts.
"""
text_body = text or ""
multimodal_parts: list[dict[str, Any]] = []
for att in attachments:
if _is_text_decodable(att.mimetype, att.filename, att.data):
try:
decoded = att.data.decode("utf-8")
except UnicodeDecodeError:
decoded = att.data.decode("latin-1")
text_body += (
f"\n\n[Attached file: {att.filename}]\n"
f"{decoded}\n"
f"[End of file: {att.filename}]"
)
logger.debug(
"Inlined text file %s (%s, %d bytes) into message body",
att.filename, att.mimetype, len(att.data),
)
else:
att_parts = await media_to_content_parts(
att.data, att.mimetype, att.filename,
)
multimodal_parts.extend(att_parts)
logger.debug(
"Kept %s (%s, %d bytes) as %d multimodal part(s)",
att.filename, att.mimetype, len(att.data),
len(att_parts),
)
parts: list[dict[str, Any]] = []
if text_body:
parts.append({"type": "text", "text": text_body})
parts.extend(multimodal_parts)
return parts if parts else [{"type": "text", "text": ""}]
async def _download_and_patch_video(
self,
url: str,
user_id: str,
history_key: str,
message_id: str,
metadata: dict[str, Any],
cookies_text: str | None = None,
downloading_annotation: str = "",
) -> None:
from .video_history_patch import run_download_and_patch_video
await run_download_and_patch_video(
self,
url,
user_id,
history_key,
message_id,
metadata,
cookies_text=cookies_text,
downloading_annotation=downloading_annotation,
)
async def _extract_knowledge_from_message(
self, msg: IncomingMessage,
) -> None:
"""Fire-and-forget per-message knowledge extraction.
Three cheap gates run *before* any LLM call:
length, heuristic regex, and per-user rate limit.
"""
try:
from kg_extraction import extract_from_message
redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
await extract_from_message(
message_text=msg.text,
user_id=msg.user_id,
user_name=msg.user_name,
channel_id=msg.channel_id,
guild_id=msg.extra.get("guild_id") or None,
openrouter=self.openrouter,
kg_manager=self.kg_manager,
redis=redis,
per_user_limit=(
self.config.kg_per_user_extraction_limit
),
)
except Exception:
logger.warning(
"Per-message knowledge extraction failed",
exc_info=True,
)