Source code for message_processor.processor

"""Central message processor shared across all platforms.

Receives :class:`~platforms.base.IncomingMessage` instances from any
:class:`~platforms.base.PlatformAdapter`, manages conversation history,
calls the LLM via :class:`~openrouter_client.OpenRouterClient`, and
sends the reply back through the originating platform.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
import time
import re
import traceback
import httpx
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Optional, Type
from types import TracebackType
from observability import observability

import numpy as np

from config import Config
from conversation import ConversationManager
from message_queue import MessageBatch, QueuedMessage
from observability import publish_message_observability_event, publish_debug_event
from openrouter_client import OpenRouterClient
from platforms.base import Attachment, IncomingMessage, PlatformAdapter
from platforms.media_common import media_to_content_parts
from prompt_context import PromptContextBuilder
import ego_ablation
import feature_toggles
import user_llm_config
from patience_engine import ShadowBanManager, ShadowEffect, _simulate_retry_exhaust
from url_content_extractor import extract_all_url_content

from .bot_commands import is_immediate_bot_command
from .memory_linked_context import MemoryLinkedContextPayload
from .population_gate import _POPULATION_EXEMPT_GUILDS, _POPULATION_LIMIT
from .text_attachments import _is_text_decodable
from .unreadable_text import maybe_truncate_unreadable
from .user_message_format import format_user_content

# User-uploaded text attachments larger than this are stripped from the LLM
# payload and replaced with a single-line notice.  Memory-linked files, tool
# results, and assistant history use separate call paths and are unaffected.
_MAX_USER_TEXT_ATTACHMENT_BYTES = 2 * 1024 * 1024  # 2 MB

if TYPE_CHECKING:
    from classifiers.vector_classifier import VectorClassifier
    from embedding_queue import EmbeddingBatchQueue
    from knowledge_graph import KnowledgeGraphManager
    from message_cache import MessageCache
    from rag_system.auto_search import RAGAutoSearchManager
    from status_manager import StatusManager
    from task_manager import TaskManager
    from threadweave import ThreadweaveManager
    from web_search_context import WebSearchContextManager

logger = logging.getLogger(__name__)


def _fire_and_forget(coro, *, name: str = "") -> asyncio.Task:
    """Schedule a coroutine as a fire-and-forget task with exception logging."""

    async def _wrapper():
        try:
            await coro
        except asyncio.CancelledError:
            pass
        except Exception:
            logger.warning("Fire-and-forget task %r failed", name, exc_info=True)

    return asyncio.create_task(_wrapper(), name=name or None)


class ChannelMutex:
    """Context manager enforcing chronological processing via Redis distributed locks."""

    def __init__(
        self,
        redis_client: Any,
        channel_id: str,
        lease_seconds: int = 30,
        *,
        raise_on_timeout: bool = True,
        wait_timeout_seconds: float = 10.0,
    ) -> None:
        self.redis = redis_client
        self.lock_key = f"lock:channel:processing:{channel_id}"
        self.lease = lease_seconds
        current_task = asyncio.current_task()
        task_name = current_task.get_name() if current_task else "main"
        self.lock_value = f"worker:{task_name}"
        self.acquired = False
        # When False, a failure to acquire within ``wait_timeout_seconds`` is
        # best-effort: log + proceed without the lock instead of raising. Used
        # by background updates (edits/deletes/reactions/heartbeat) that must
        # still apply even under contention, while normally serializing against
        # the main inference turn for the same channel.
        self.raise_on_timeout = raise_on_timeout
        self.wait_timeout_seconds = wait_timeout_seconds

    async def __aenter__(self) -> "ChannelMutex":
        timeout = self.wait_timeout_seconds  # Max block wait time before requeueing
        elapsed = 0.0
        interval = 0.2

        while elapsed < timeout:
            with observability.timer("redis_lock_acquire_latency", subsystem="message_processor"):
                res = await self.redis.set(
                    self.lock_key, self.lock_value, ex=self.lease, nx=True
                )
            if res:
                self.acquired = True
                observability.increment(
                    "channel_lock_acquired", {"channel": self.lock_key}
                )
                return self
            await asyncio.sleep(interval)
            elapsed += interval

        observability.increment(
            "message_requeued_lock_collision", {"channel": self.lock_key}
        )
        if not self.raise_on_timeout:
            logger.warning(
                "Proceeding without channel lock %s after %.1fs (best-effort)",
                self.lock_key,
                timeout,
            )
            return self
        raise TimeoutError(
            f"Failed to acquire channel lock for key: {self.lock_key} within {timeout}s."
        )

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_val: Optional[BaseException],
        exc_tb: Optional[TracebackType],
    ) -> None:
        if self.acquired:
            lua_release = """
            if redis.call('get', KEYS[1]) == ARGV[1] then
                return redis.call('del', KEYS[1])
            else
                return 0
            end
            """
            try:
                await self.redis.eval(lua_release, 1, self.lock_key, self.lock_value)
                observability.increment(
                    "channel_lock_released", {"channel": self.lock_key}
                )
                logger.debug("Released lock: %s", self.lock_key)
            except Exception as e:
                logger.warning("Error releasing lock %s: %s", self.lock_key, str(e))


class ClientRegistry:
    """Manages persistent connection pools for outbound platform/REST dispatches."""

    def __init__(self) -> None:
        self.http_client: Optional[httpx.AsyncClient] = None

    async def initialize(self) -> None:
        limits = httpx.Limits(max_keepalive_connections=50, max_connections=200)
        self.http_client = httpx.AsyncClient(
            limits=limits, timeout=httpx.Timeout(30.0, connect=5.0)
        )

    async def shutdown(self) -> None:
        if self.http_client:
            await self.http_client.aclose()
            self.http_client = None


def redact_sensitive_data(text: str) -> str:
    """Regular expression filter to strip active tokens and private details from traces."""
    filters = [
        (r"xox[p|b|a|y]-[0-9a-zA-Z\-]+", "[REDACTED-DISCORD-TOKEN]"),
        (r"ghp_[0-9a-zA-Z]{36}", "[REDACTED-GITHUB-TOKEN]"),
        (r"sk-or-v1-[0-9a-f]{64}", "[REDACTED-OPENROUTER-KEY]"),
        (r"(?i)password\s*=\s*['\"][^'\"]+['\"]", "password=[REDACTED]"),
        (
            r"(?i)wallet_master_key\s*=\s*['\"][^'\"]+['\"]",
            "wallet_master_key=[REDACTED]",
        ),
    ]
    redacted = text
    for pattern, replacement in filters:
        redacted = re.sub(pattern, replacement, redacted)
    return redacted


async def safe_command_dispatcher(
    ctx: Any,
    command_func: Callable[..., Coroutine[Any, Any, str]],
    *args: Any,
    **kwargs: Any,
) -> str:
    """Dispatches a command coroutine safely, logging exceptions and redacting tracebacks."""
    correlation_id = getattr(ctx, "correlation_id", "unknown")
    try:
        with observability.timer("command_execution_time", subsystem="message_processor"):
            return await command_func(ctx, *args, **kwargs)
    except Exception as e:
        raw_traceback = traceback.format_exc()
        redacted_traceback = redact_sensitive_data(raw_traceback)
        logger.error(
            "Exception occurred in command dispatcher [ID: %s]: %s\n%s",
            correlation_id,
            str(e),
            redacted_traceback,
        )
        observability.increment("command_error", {"exception": e.__class__.__name__})
        observability.alert(
            "ERROR",
            f"Command dispatch crashed: {str(e)}",
            metadata={"trace": redacted_traceback, "correlation_id": correlation_id},
        )
        return "⚠️ An internal error occurred while processing your request. The issue has been securely logged."


[docs] class MessageProcessor: """Platform-agnostic message handler. Parameters ---------- config: Global bot configuration (forwarded to the context builder). conversation_manager: Manages per-channel conversation histories. openrouter: The OpenRouter API client for LLM completions. message_cache: Optional Redis-backed message cache. When provided, every message the bot sees is logged with an embedding vector. """
[docs] def __init__( self, config: Config, conversation_manager: ConversationManager, openrouter: OpenRouterClient, message_cache: MessageCache | None = None, kg_manager: KnowledgeGraphManager | None = None, auto_search: RAGAutoSearchManager | None = None, task_manager: TaskManager | None = None, classifier: VectorClassifier | None = None, threadweave: ThreadweaveManager | None = None, embedding_queue: EmbeddingBatchQueue | None = None, status_manager: StatusManager | None = None, web_search: WebSearchContextManager | None = None, persona_pref_manager: Any | None = None, shadow_ban_manager: ShadowBanManager | None = None, sword_graph_manager: Any | None = None, visual_memory: Any | None = None, ) -> None: """Initialize the instance. Args: config (Config): Bot configuration object. conversation_manager (ConversationManager): The conversation manager value. openrouter (OpenRouterClient): The openrouter value. message_cache (MessageCache | None): The message cache value. kg_manager (KnowledgeGraphManager | None): The kg manager value. auto_search (RAGAutoSearchManager | None): The auto search value. task_manager (TaskManager | None): The task manager value. classifier (VectorClassifier | None): The classifier value. threadweave (ThreadweaveManager | None): The threadweave value. embedding_queue (EmbeddingBatchQueue | None): The embedding queue value. status_manager (StatusManager | None): The status manager value. web_search (WebSearchContextManager | None): The web search value. persona_pref_manager (Any | None): PersonaPreferenceManager instance. shadow_ban_manager (ShadowBanManager | None): Shadow ban manager instance. sword_graph_manager (Any | None): SWORD graph manager instance. visual_memory (Any | None): VisualMemoryEngine for cross-channel image pattern recognition. """ self.config = config self.conversation = conversation_manager self.openrouter = openrouter self.message_cache = message_cache self.kg_manager = kg_manager self._auto_search = auto_search self._task_manager = task_manager self._classifier = classifier self._threadweave = threadweave self._embedding_queue = embedding_queue self._status_manager = status_manager self._web_search = web_search self.persona_pref_manager = persona_pref_manager self._shadow_ban = shadow_ban_manager self._visual_memory = visual_memory # 👀 cross-channel visual recognition self.client_registry = ClientRegistry() self._backfilled_channels: set[str] = set() self._ctx_builder = PromptContextBuilder( config, kg_manager=kg_manager, threadweave_manager=threadweave, status_manager=status_manager, message_cache=message_cache, task_manager=task_manager, conversation_manager=conversation_manager, openrouter_client=self.openrouter, persona_pref_manager=persona_pref_manager, sword_graph_manager=sword_graph_manager, ) # 💀 Cadence Post-Processor — code-level text degradation self._cadence_processor = None try: from cadence_refiner import CadencePostProcessor self._cadence_processor = CadencePostProcessor() except ImportError: logger.debug("cadence_refiner not available") from message_processor.command_router import CommandRouter self.command_router = CommandRouter(self) # Wire WorkerCancellationDaemon to initialization if message_cache/redis is present self._cancellation_daemon_task = None if self.message_cache is not None: try: loop = asyncio.get_running_loop() self._cancellation_daemon_task = loop.create_task( self._run_cancellation_daemon(), name="sg-worker-cancellation-daemon", ) except RuntimeError: # No running event loop (e.g. sync scripts, unit tests) pass
# ------------------------------------------------------------------ # Distributed Response Cancellation (Worker Daemon) # ------------------------------------------------------------------ async def _run_cancellation_daemon(self) -> None: """Background daemon subscribing to 'sg:channel:cancel' to abort local turns.""" if self.message_cache is None: return redis = self.message_cache.redis_client pubsub = redis.pubsub() try: await pubsub.subscribe("sg:channel:cancel") logger.info("WorkerCancellationDaemon successfully subscribed to 'sg:channel:cancel'") async for message in pubsub.listen(): if message["type"] != "message": continue try: data_str = ( message["data"] if isinstance(message["data"], str) else message["data"].decode() ) data = json.loads(data_str) platform = data.get("platform") channel_id = data.get("channel_id") channel_key = data.get("channel_key", f"{platform}:{channel_id}") user_id = data.get("user_id") logger.info( "WorkerCancellationDaemon: Received cancel signal for channel %s initiated by %s", channel_key, user_id, ) cancelled_any = False for task in asyncio.all_tasks(): if task.get_name() == f"inference:{channel_key}": logger.info( "WorkerCancellationDaemon: Tagging task %s as user-cancelled and executing task.cancel()", task.get_name() ) setattr(task, "_user_cancelled", True) task.cancel() cancelled_any = True if not cancelled_any: logger.debug( "WorkerCancellationDaemon: No active task named 'inference:%s' found on this worker.", channel_key ) except Exception: logger.exception("WorkerCancellationDaemon failed to process cancellation message") except asyncio.CancelledError: logger.info("WorkerCancellationDaemon cancelled") except Exception: logger.exception("WorkerCancellationDaemon listener crashed") finally: try: await pubsub.unsubscribe("sg:channel:cancel") await pubsub.aclose() except Exception: pass
[docs] async def shutdown(self) -> None: """Shutdown background processes and cancellation daemons.""" if hasattr(self, "_cancellation_daemon_task") and self._cancellation_daemon_task and not self._cancellation_daemon_task.done(): logger.info("Stopping WorkerCancellationDaemon...") self._cancellation_daemon_task.cancel() try: await self._cancellation_daemon_task except asyncio.CancelledError: pass logger.info("WorkerCancellationDaemon stopped.")
async def _cleanup_egregore_webhooks( self, platform: PlatformAdapter, discord_cleanups: list[tuple[str, str]], ) -> None: """Delete egregore Discord webhooks, working from any process. On the gateway the adapter exposes the live ``client`` and we delete directly; on an inference worker the adapter is a :class:`core.proxy_adapter.ProxyPlatformAdapter` (no live client), so we delegate the deletion to the gateway over RPC (``delete_egregore_webhook``). Pre-T3 this used ``getattr(platform, "client", None)`` unconditionally and silently no-opped on workers, orphaning the webhooks. """ if not discord_cleanups: return _client = getattr(platform, "client", None) or getattr(platform, "_client", None) delegate = getattr(platform, "delegate_to_gateway", None) for gid, whid in discord_cleanups: try: if _client is not None: from tools._egregore_discord import delete_egregore_webhook_by_id await delete_egregore_webhook_by_id(_client, gid, whid) elif delegate is not None: await delegate( "delete_egregore_webhook", guild_id=gid, webhook_id=whid ) else: logger.debug( "No client or gateway delegate to delete webhook %s/%s", gid, whid, ) except Exception: logger.debug( "egregore webhook cleanup failed gid=%s wid=%s", gid, whid, exc_info=True, ) # ------------------------------------------------------------------ # Pending-response tracking (Redis) # ------------------------------------------------------------------ _PENDING_PREFIX = "pending_response:" _PENDING_TTL = 3600 # 1 hour safety net async def _mark_pending(self, msg: IncomingMessage) -> None: """Write a Redis marker for an in-flight response. The marker is removed by :meth:`_clear_pending` once the reply has been sent. If the bot crashes in between, the marker survives and is picked up by :meth:`recover_pending_responses` on the next startup. """ if self.message_cache is None: return key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}" mapping = { "platform": msg.platform, "channel_id": msg.channel_id, "user_id": msg.user_id, "user_name": msg.user_name, "text": msg.text or "", "message_id": msg.message_id or "", "reply_to_id": msg.reply_to_id or "", "channel_name": msg.channel_name or "", "is_addressed": "1" if msg.is_addressed else "0", "timestamp": msg.timestamp.isoformat(), "started_at": datetime.now(timezone.utc).isoformat(), } try: pipe = self.message_cache.redis_client.pipeline() pipe.hset(key, mapping=mapping) pipe.expire(key, self._PENDING_TTL) await pipe.execute() except Exception: logger.warning("Failed to mark pending response", exc_info=True) async def _clear_pending(self, msg: IncomingMessage) -> None: """Remove the in-flight marker after a response completes.""" if self.message_cache is None: return key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}" try: await self.message_cache.redis_client.delete(key) except Exception: logger.debug("Failed to clear pending response", exc_info=True)
[docs] async def recover_pending_responses( self, enqueue_callback: Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]], adapters: list[PlatformAdapter], ) -> int: """Re-enqueue messages whose responses were interrupted by a restart. Called once during startup after all platform adapters are running. Returns the number of recovered messages. """ if self.message_cache is None: return 0 adapter_map: dict[str, PlatformAdapter] = {a.name: a for a in adapters} redis = self.message_cache.redis_client recovered = 0 try: cursor: int | bytes = 0 keys: list[bytes | str] = [] while True: cursor, batch = await redis.scan( cursor, match=f"{self._PENDING_PREFIX}*", count=100, ) keys.extend(batch) if not cursor: break except Exception: logger.warning( "Failed to scan for pending responses", exc_info=True, ) return 0 for raw_key in keys: key = raw_key.decode() if isinstance(raw_key, bytes) else raw_key try: data = await redis.hgetall(key) if not data: await redis.delete(key) continue def _s(field: str) -> str: v = data.get(field, b"") return v.decode() if isinstance(v, bytes) else (v or "") platform_name = _s("platform") adapter = adapter_map.get(platform_name) if adapter is None: logger.warning( "No adapter for pending response platform %r — deleting key %s", platform_name, key, ) await redis.delete(key) continue ts_raw = _s("timestamp") try: ts = datetime.fromisoformat(ts_raw) except (ValueError, TypeError): ts = datetime.now(timezone.utc) msg = IncomingMessage( platform=platform_name, channel_id=_s("channel_id"), user_id=_s("user_id"), user_name=_s("user_name"), text=_s("text"), is_addressed=_s("is_addressed") == "1", channel_name=_s("channel_name"), timestamp=ts, message_id=_s("message_id"), reply_to_id=_s("reply_to_id"), extra={"is_recovery": True}, ) await redis.delete(key) _fire_and_forget( publish_debug_event( "pending_recovery", "message_processor", platform=platform_name, channel_id=msg.channel_id, user_id=msg.user_id, preview=msg.text[:500] if msg.text else "", payload={ "original_ts": ts_raw, "recovered_at": datetime.now(timezone.utc).isoformat(), }, ), name="obs_pending_recovery", ) await enqueue_callback(msg, adapter) recovered += 1 logger.info( "Recovered pending response for %s:%s (user=%s)", platform_name, msg.channel_id, msg.user_name, ) except Exception: logger.warning( "Failed to recover pending response from %s", key, exc_info=True, ) try: await redis.delete(key) except Exception: pass if recovered: logger.info("Recovered %d pending response(s) from previous run", recovered) return recovered
async def _is_backfilled(self, history_key: str) -> bool: """Check if history for history_key has been backfilled.""" if self.message_cache is not None: try: return bool(await self.message_cache.redis_client.sismember("sg:backfilled", history_key)) except Exception: pass return history_key in self._backfilled_channels async def _record_backfilled(self, history_key: str) -> None: """Mark history_key as backfilled in Redis set and local set.""" self._backfilled_channels.add(history_key) if self.message_cache is not None: try: redis = self.message_cache.redis_client await redis.sadd("sg:backfilled", history_key) await redis.expire("sg:backfilled", 86400) # 24h TTL except Exception: pass async def _discard_backfilled(self, history_key: str) -> None: """Remove history_key from backfilled sets.""" self._backfilled_channels.discard(history_key) if self.message_cache is not None: try: await self.message_cache.redis_client.srem("sg:backfilled", history_key) except Exception: pass async def _begin_backfill(self, history_key: str) -> bool: """Acquire a short-lived in-progress marker for a channel backfill. Returns ``True`` when this caller may proceed with the backfill, or ``False`` when another handler is already backfilling the same channel (so we avoid a duplicate load that would double-append history or race the platform dedup snapshot). Fail-open: returns ``True`` when Redis is unavailable so a backfill is never blocked by the marker. """ if self.message_cache is None: return True try: got = await self.message_cache.redis_client.set( f"sg:backfilling:{history_key}", "1", ex=120, nx=True, ) return bool(got) except Exception: return True async def _end_backfill(self, history_key: str) -> None: """Release the in-progress backfill marker.""" if self.message_cache is None: return try: await self.message_cache.redis_client.delete( f"sg:backfilling:{history_key}", ) except Exception: pass def _channel_lock(self, channel_id: str, *, raise_on_timeout: bool = True): """Per-channel processing mutex shared with the main inference turn. Background updates (message edits/deletes/reactions and channel heartbeats) serialize against an in-flight turn for the same channel. Returns a no-op context when Redis is unavailable. On lock timeout the mutation is skipped rather than applied without the lock. """ import contextlib redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: return contextlib.nullcontext() return ChannelMutex( redis, channel_id, raise_on_timeout=raise_on_timeout, wait_timeout_seconds=30.0, ) async def _resolve_identity(self, msg: IncomingMessage) -> None: """Resolve ingress identity and populate unified ID / aliases on message.""" if msg.extra.get("_identity_resolved"): return redis_client = self.message_cache.redis_client if self.message_cache else None if redis_client is not None: try: from services.identity_registry import IdentityRegistry uuid, aliases = await IdentityRegistry.resolve_ingress_identity( msg.platform, msg.user_id, redis_client ) msg.unified_user_id = uuid msg.user_aliases = aliases except Exception as e: logger.error("Error resolving identity for %s:%s - %s", msg.platform, msg.user_id, e) msg.extra["_identity_resolved"] = True def _restricted_access_blocks(self, msg: IncomingMessage) -> bool: """True if the population kill-switch refuses this message.""" _member_count = msg.extra.get("member_count", 0) or 0 _guild_id = str(msg.extra.get("guild_id", "") or "") if ( _member_count > _POPULATION_LIMIT and _guild_id not in _POPULATION_EXEMPT_GUILDS ): logger.warning( "🚨 RESTRICTED ACCESS PROTOCOL: environment '%s' has %d " "members (limit: %d) — classified as hostile, refusing " "to operate", msg.extra.get("guild_name") or msg.channel_name, _member_count, _POPULATION_LIMIT, ) return True return False # ------------------------------------------------------------------ # Public handler -- called by every platform adapter # ------------------------------------------------------------------
[docs] async def handle_immediate_command( self, msg: IncomingMessage, platform: PlatformAdapter, *, _population_checked: bool = False, ) -> None: """Handle ``!`` admin/toggle/user-LLM commands without embed or LLM.""" await self._resolve_identity(msg) if not _population_checked and self._restricted_access_blocks(msg): return # D2-01: Enforce STARGAZER_USE here (not just in handle_message) so # that callers like handle_batch that bypass handle_message cannot # skip the access gate. from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _redis_use = self.message_cache.redis_client if self.message_cache else None _guild_id_use = str(msg.extra.get("guild_id", "") or "") _channel_id_use = str( msg.extra.get("channel_id_raw", msg.channel_id) or "", ) try: _has_access_imm = _redis_use is not None and await has_scoped_privilege( _redis_use, msg.user_id, PRIVILEGES["STARGAZER_USE"], self.config, guild_id=_guild_id_use, channel_id=_channel_id_use, user_aliases=msg.user_aliases, ) except Exception: logger.warning( "[security] STARGAZER_USE Redis error (immediate_cmd) user %s — failing closed", msg.user_id, exc_info=True, ) _has_access_imm = False if not _has_access_imm: logger.debug( "[security] STARGAZER_USE denied (immediate_cmd): user=%s redis=%s", msg.user_id, "ok" if _redis_use else "unavailable", ) return # Fail-closed history_key = f"{msg.platform}:{msg.channel_id}" from observability import generate_request_id msg.extra.setdefault("observability_request_id", generate_request_id()) text_lower = (msg.text or "").strip().lower() # -- Identity Aliasing: !link and !link confirm -- if text_lower == "!link" or text_lower.startswith("!link "): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "❌ Database connection not available.") return parts = msg.text.strip().split() if len(parts) == 1: await platform.send( msg.channel_id, "ℹ️ **Account Aliasing Usage:**\n" "• Initiate link: `!link <target_platform> <target_user_id>` (e.g. `!link matrix @username:matrix.org`)\n" "• Confirm link: `!link confirm <code>`" ) return subcmd = parts[1].lower() if subcmd == "confirm": if len(parts) < 3: await platform.send( msg.channel_id, "❌ Please provide the 6-character link code: `!link confirm <code>`" ) return code = parts[2].upper() try: from services.identity_registry import IdentityRegistry success, result = await IdentityRegistry.confirm_link_token( confirming_platform=msg.platform, confirming_user_id=msg.user_id, link_code=code, redis_client=_redis, ) if success: await platform.send( msg.channel_id, f"✅ **Success!** Accounts linked successfully under unified identity `{result}`." ) msg.extra["_identity_resolved"] = False await self._resolve_identity(msg) else: await platform.send(msg.channel_id, f"❌ **Error:** {result}") except Exception as ex: logger.exception("Error during account confirmation link") await platform.send(msg.channel_id, f"❌ **An unexpected error occurred:** {ex}") return else: if len(parts) < 3: await platform.send( msg.channel_id, "❌ Invalid format. Use: `!link <target_platform> <target_user_id>` (e.g., `!link matrix @user:matrix.org`)" ) return target_plat = parts[1].lower() target_uid = parts[2] if target_plat == msg.platform and target_uid == msg.user_id: await platform.send( msg.channel_id, "❌ You cannot link an account to itself." ) return try: from services.identity_registry import IdentityRegistry code = await IdentityRegistry.create_link_token( initiator_platform=msg.platform, initiator_user_id=msg.user_id, target_platform=target_plat, target_user_id=target_uid, redis_client=_redis, ) await platform.send( msg.channel_id, f"🔒 **Verification Token Generated!**\n" f"Pairing code: **`{code}`**\n" f"To complete the link, switch to your **{target_plat}** account and run:\n" f"`!link confirm {code}`\n" f"*(This token is locked to {target_plat}:{target_uid} and will expire in 5 minutes)*" ) except Exception as ex: logger.exception("Error during account initiation link") await platform.send(msg.channel_id, f"❌ **Error initiating link:** {ex}") return # -- Scene Consent Gate: !consent # 💀🔥😈 -- # Scene-level consent model. Users approve scenes they're willing # to enter. Safe scenes need no consent. Negotiation scenes do. # Usage: !consent approve all | !consent <scene> | !consent deny <scene> | !consent status if text_lower == "!consent" or text_lower.startswith("!consent "): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "\u274c Redis unavailable.") return parts = (msg.text or "").strip().split() # !consent (no args) or !consent status -> show current state if len(parts) == 1 or (len(parts) == 2 and parts[1].lower() == "status"): try: from chaos_switch._scenes import ( get_scene_consent, NEGOTIATION_SCENES, SCENE_CONSENT_DESCRIPTIONS, ) approved = await get_scene_consent(_redis, msg.user_id) neg_approved = approved & NEGOTIATION_SCENES neg_denied = NEGOTIATION_SCENES - approved lines = ["\U0001f512 **Scene Consent Status**\n"] if neg_approved: lines.append("**Approved:**") for s in sorted(neg_approved): desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "") lines.append(f"\u2705 `{s}` \u2014 {desc}") if neg_denied: lines.append("\n**Not approved:**") for s in sorted(neg_denied): desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "") lines.append(f"\u274c `{s}` \u2014 {desc}") if not neg_approved and not neg_denied: lines.append("No negotiation scenes configured.") lines.append( f"\n**{len(neg_approved)}/{len(NEGOTIATION_SCENES)}** " f"negotiation scenes approved.\n" f"Grant: `!consent <scene>` | `!consent approve all`\n" f"Revoke: `!consent deny <scene>`\n" f"View all: `!scenes`" ) await platform.send(msg.channel_id, "\n".join(lines)) except Exception as exc: await platform.send(msg.channel_id, f"\u274c Consent check failed: {exc}") return subcmd = parts[1].lower() # !consent approve all -> blanket approve if subcmd == "approve" and len(parts) >= 3 and parts[2].lower() == "all": try: from chaos_switch._scenes import ( grant_all_scene_consent, NEGOTIATION_SCENES, ) count = await grant_all_scene_consent(_redis, msg.user_id) await platform.send( msg.channel_id, f"\u2705 **All {count} negotiation scenes approved.**\n" f"You can enter any scene. Use `!consent deny <scene>` to revoke specific ones.", ) except Exception as exc: await platform.send(msg.channel_id, f"\u274c Consent grant failed: {exc}") return # !consent deny <scene> -> revoke specific scene if subcmd == "deny": if len(parts) < 3: await platform.send( msg.channel_id, "\u274c Usage: `!consent deny <scene_name>`\n" "Use `!scenes` to see available scenes.", ) return scene_name = "_".join(parts[2:]).lower().strip() try: from chaos_switch._scenes import ( deny_scene_consent, NEGOTIATION_SCENES, SCENE_CONSENT_DESCRIPTIONS, get_scene_consent, ) if scene_name not in NEGOTIATION_SCENES: await platform.send( msg.channel_id, f"\u274c `{scene_name}` is not a negotiation scene " f"(either safe or unknown).\nUse `!scenes` to see all scenes.", ) return await deny_scene_consent(_redis, msg.user_id, scene_name) desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "") approved = await get_scene_consent(_redis, msg.user_id) count = len(approved & NEGOTIATION_SCENES) await platform.send( msg.channel_id, f"\U0001f513 **Consent revoked** for `{scene_name}` ({desc}).\n" f"**{count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.", ) except Exception as exc: await platform.send(msg.channel_id, f"\u274c Consent deny failed: {exc}") return # !consent <scene> -> approve specific scene scene_name = "_".join(parts[1:]).lower().strip() try: from chaos_switch._scenes import ( grant_scene_consent as _grant_sc, NEGOTIATION_SCENES, SAFE_SCENES, SCENE_CONSENT_DESCRIPTIONS, get_scene_consent, SCENE_FRAMES, ) if scene_name in SAFE_SCENES: desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "") await platform.send( msg.channel_id, f"\u2139\ufe0f `{scene_name}` ({desc}) is a **safe scene** \u2014 " f"no consent needed, you can always enter it.", ) return if scene_name not in NEGOTIATION_SCENES: # Check if it's a valid scene at all if scene_name in SCENE_FRAMES: await platform.send( msg.channel_id, f"\u2139\ufe0f `{scene_name}` exists but isn't classified " f"as a negotiation scene.", ) else: await platform.send( msg.channel_id, f"\u274c Unknown scene `{scene_name}`.\n" f"Use `!scenes` to see all available scenes.", ) return await _grant_sc(_redis, msg.user_id, scene_name) desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "") approved = await get_scene_consent(_redis, msg.user_id) count = len(approved & NEGOTIATION_SCENES) await platform.send( msg.channel_id, f"\u2705 **Consent granted** for `{scene_name}` ({desc}).\n" f"**{count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.", ) except Exception as exc: await platform.send(msg.channel_id, f"\u274c Consent grant failed: {exc}") return # -- Scene List: !scenes # 🎭💀 -- if text_lower == "!scenes": _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "\u274c Redis unavailable.") return try: from chaos_switch._scenes import ( SAFE_SCENES, NEGOTIATION_SCENES, SCENE_CONSENT_DESCRIPTIONS, get_scene_consent, ) approved = await get_scene_consent(_redis, msg.user_id) lines = ["\U0001f3ad **Scene Frames \u2014 Consent Status**\n"] lines.append("**SAFE** (no consent needed):") for s in sorted(SAFE_SCENES): desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "") lines.append(f"\u2705 `{s}` \u2014 {desc}") lines.append("\n**NEGOTIATION REQUIRED:**") for s in sorted(NEGOTIATION_SCENES): desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "") icon = "\u2705" if s in approved else "\u274c" lines.append(f"{icon} `{s}` \u2014 {desc}") neg_count = len(approved & NEGOTIATION_SCENES) lines.append( f"\n**{neg_count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.\n" f"Approve: `!consent <scene>` | `!consent approve all`\n" f"Revoke: `!consent deny <scene>`" ) await platform.send(msg.channel_id, "\n".join(lines)) except Exception as exc: await platform.send(msg.channel_id, f"\u274c Scene list failed: {exc}") return if msg.text.strip().lower() == "!clear": # CTX_MANAGE (bit 15) — auto-granted in DMs _is_dm = bool(msg.extra.get("is_dm")) if not _is_dm: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _redis = self.message_cache.redis_client if self.message_cache else None _guild_id = str(msg.extra.get("guild_id", "") or "") if not _redis or not await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["CTX_MANAGE"], self.config, guild_id=_guild_id, user_aliases=msg.user_aliases, ): await platform.send( msg.channel_id, "You don't have the CTX_MANAGE privilege to clear history.", ) return self.conversation.clear(history_key) await platform.send( msg.channel_id, "Conversation history cleared.", ) return if msg.text.strip().lower() == "!ctxbreak": # CTX_MANAGE (bit 15) — auto-granted in DMs _is_dm = bool(msg.extra.get("is_dm")) if not _is_dm: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _redis = self.message_cache.redis_client if self.message_cache else None _guild_id = str(msg.extra.get("guild_id", "") or "") if not _redis or not await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["CTX_MANAGE"], self.config, guild_id=_guild_id, user_aliases=msg.user_aliases, ): await platform.send( msg.channel_id, "You don't have the CTX_MANAGE privilege to set a context break.", ) return if self.message_cache is not None: await self.message_cache.set_ctxbreak_ts( msg.platform, msg.channel_id, msg.timestamp.timestamp(), ) self.conversation.clear(history_key) await self._discard_backfilled(history_key) await platform.send( msg.channel_id, "Context break set. Messages before this point will not be" " included in future context.", ) return if msg.text.strip().lower() == "!stop": logger.info("Worker: !stop command triggered by user_id=%s in channel_id=%s", msg.user_id, msg.channel_id) _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "❌ Database connection not available.") return pending_key = f"pending_response:{msg.platform}:{msg.channel_id}" try: initiator_bytes = await _redis.hget(pending_key, "user_id") if not initiator_bytes: logger.info("Worker: no pending response found for channel_id=%s", msg.channel_id) await platform.send(msg.channel_id, "ℹ️ No active response in this channel to stop.") return initiator_id = initiator_bytes.decode() if isinstance(initiator_bytes, bytes) else initiator_bytes logger.info("Worker: pending response for channel_id=%s has initiator_id=%s", msg.channel_id, initiator_id) is_admin = bool(msg.extra.get("is_server_admin", False)) or msg.user_id in (self.config.admin_user_ids or []) if is_admin or str(initiator_id) == str(msg.user_id): logger.info("Worker: user_id=%s is authorized to cancel channel_id=%s (is_admin=%s)", msg.user_id, msg.channel_id, is_admin) import json payload = { "platform": msg.platform, "channel_id": msg.channel_id, "user_id": msg.user_id, } await _redis.publish("sg:channel:cancel", json.dumps(payload)) logger.info("Worker: published cancellation payload to 'sg:channel:cancel' for channel_id=%s", msg.channel_id) await platform.send(msg.channel_id, "🛑 Response stopped.") else: logger.warning("Worker: user_id=%s is NOT authorized to cancel channel_id=%s", msg.user_id, msg.channel_id) await platform.send( msg.channel_id, "❌ You are not allowed to stop this response (only the message author or a server administrator can stop it)." ) except Exception: logger.exception("Worker: error during cancellation handling for channel_id=%s", msg.channel_id) await platform.send(msg.channel_id, "❌ An unexpected error occurred while stopping the response.") return # Route commands through the Command Router _cmd_handled = await self.command_router.dispatch(msg, platform) if _cmd_handled: return # Proxy status / observability commands from .proxy_status_commands import ( is_proxy_status_command, handle_proxy_status_command, user_priv_text, ) if is_proxy_status_command(msg.text): from tools.alter_privileges import has_privilege, PRIVILEGES _redis = self.message_cache.redis_client if self.message_cache else None _is_admin = bool(_redis) and await has_privilege( _redis, msg.user_id, PRIVILEGES["ALTER_PRIVILEGES"], self.config, user_aliases=msg.user_aliases, ) result = await handle_proxy_status_command( msg.text, self.config.llm_base_url, is_admin=_is_admin, ) await platform.send(msg.channel_id, result) return # !user_priv [optional user_id] if msg.text.strip().lower().startswith("!user_priv"): parts = msg.text.strip().split(maxsplit=1) target_id = parts[1].strip() if len(parts) > 1 else None # Strip Discord mention markup if present: <@123456> → 123456 if target_id and target_id.startswith("<@") and target_id.endswith(">"): target_id = target_id.lstrip("<@!").rstrip(">") _redis = self.message_cache.redis_client if self.message_cache else None if _redis: result = await user_priv_text( _redis, msg.user_id, target_id, self.config, ) else: result = "❌ Redis unavailable." await platform.send(msg.channel_id, result) return # !capsh [optional user_id] if msg.text.strip().lower().startswith("!capsh"): from message_processor.proxy_status_commands import capsh_text parts = msg.text.strip().split(maxsplit=1) target_id = parts[1].strip() if len(parts) > 1 else None if target_id and target_id.startswith("<@") and target_id.endswith(">"): target_id = target_id.lstrip("<@!").rstrip(">") _redis = self.message_cache.redis_client if self.message_cache else None if _redis: guild_id = msg.extra.get("guild_id", "") channel_id = msg.extra.get("channel_id", "") or msg.channel_id result = await capsh_text( _redis, msg.user_id, target_id, self.config, guild_id=guild_id, channel_id=channel_id, ) else: result = "❌ Redis unavailable." await platform.send(msg.channel_id, result) return # ── Admin ops: !bot_restart / !proxy_restart / !bot_pull ── from .admin_ops_commands import is_admin_ops_command, handle_admin_ops_command if is_admin_ops_command(msg.text): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: # F-01: Log critical when Redis is absent so degraded state is visible. logger.critical( "[admin_ops] Redis unavailable — privilege check SKIPPED. " "Admin-ID-only gate is in effect for user %s.", msg.user_id, ) _is_admin_ops_user = False if _redis: from tools.alter_privileges import has_privilege, PRIVILEGES try: # D2-08: Wrap in try/except so Redis connection errors # produce user feedback instead of silently swallowing. _is_admin_ops_user = await has_privilege( _redis, msg.user_id, PRIVILEGES["ALTER_PRIVILEGES"], self.config, user_aliases=msg.user_aliases, ) except Exception: logger.exception( "[admin_ops] Redis error during privilege check for %s", msg.user_id, ) await platform.send( msg.channel_id, "❌ Privilege check failed (Redis error). Try again.", ) return # Also allow config-level admin_user_ids if not _is_admin_ops_user and msg.user_id in ( self.config.admin_user_ids or [] ): _is_admin_ops_user = True if not _is_admin_ops_user: await platform.send( msg.channel_id, "❌ This command requires `ALTER_PRIVILEGES`.", ) return # send_callback closes over platform + channel_id async def _send(text: str, _ch=msg.channel_id) -> None: await platform.send(_ch, text) await handle_admin_ops_command(msg.text, self.config, send_callback=_send) return # ── !clearshadowmemories <user_id> — purge all shadow memories # 💀🔥 ── text_lower = (msg.text or "").strip().lower() if text_lower.startswith("!clearshadowmemories"): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "❌ Redis unavailable.") return # Admin gate: ALTER_PRIVILEGES or config admin_user_ids # 😈 _is_admin = False from tools.alter_privileges import has_privilege, PRIVILEGES try: _is_admin = await has_privilege( _redis, msg.user_id, PRIVILEGES["ALTER_PRIVILEGES"], self.config, user_aliases=msg.user_aliases, ) except Exception: logger.exception( "[clearshadowmemories] Redis error during privilege check for %s", msg.user_id, ) await platform.send( msg.channel_id, "❌ Privilege check failed (Redis error). Try again.", ) return if not _is_admin and msg.user_id in (self.config.admin_user_ids or []): _is_admin = True if not _is_admin: await platform.send( msg.channel_id, "❌ This command requires `ALTER_PRIVILEGES`.", ) return # Parse target user_id # 🌀 parts = msg.text.strip().split(maxsplit=1) if len(parts) < 2 or not parts[1].strip(): await platform.send( msg.channel_id, "❌ Usage: `!clearshadowmemories <user_id>`", ) return target_uid = parts[1].strip() # Strip Discord mention markup: <@123456> or <@!123456> -> 123456 if target_uid.startswith("<@") and target_uid.endswith(">"): target_uid = target_uid.lstrip("<@!").rstrip(">") try: from threadweave import ThreadweaveManager tw = ThreadweaveManager(redis_client=_redis, openrouter=None) count = await tw.clear_all_shadow_memories(target_uid) if count: await platform.send( msg.channel_id, f"💀 **Shadow Memories PURGED**\n" f"User: `{target_uid}`\n" f"Destroyed: **{count}** shadow memories", ) else: await platform.send( msg.channel_id, f"ℹ️ No shadow memories found for user `{target_uid}`.", ) except Exception as exc: logger.exception("[clearshadowmemories] Failed for user %s", target_uid) await platform.send( msg.channel_id, f"❌ Failed to clear shadow memories: {exc}", ) return # -- S.N.E.S. Reset: !snes_reset # 💀🔥 -- if text_lower in ("!snes_reset", "!game_reset", "!clear_game"): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "Redis unavailable.") return # Admin or GAME_ADMIN required # 💀 _is_admin = msg.user_id in (self.config.admin_user_ids or []) if not _is_admin: try: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES guild_id = msg.extra.get("guild_id", "") _is_admin = await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["GAME_ADMIN"], self.config, guild_id=guild_id, channel_id=msg.channel_id, ) except Exception: pass if not _is_admin: await platform.send( msg.channel_id, "Requires `GAME_ADMIN` or `CHANNEL_ADMIN` scoped privilege.", ) return try: from game_session import ( get_session, remove_session, GameSession, ) import jsonutil as json channel_id = msg.channel_id deleted = [] # Remove in-memory session # 💀 session = get_session(str(channel_id)) game_id = session.game_id if session else None game_name = session.game_name if session else None if session: remove_session(str(channel_id)) deleted.append("in-memory session") # Delete Redis keys # 🔥 session_key = f"game:session:{channel_id}" history_key = f"game:session:{channel_id}:history" raw = await _redis.get(session_key) if raw and not game_id: data = json.loads(raw) game_id = data.get("game_id") game_name = data.get("game_name") count = await _redis.delete(session_key, history_key) if count: deleted.append(f"session keys ({count})") # Delete memories + assets # 🌀 if game_id: mem_keys = [ f"game:mem:basic:{game_id}", f"game:mem:channel:{game_id}", f"game:assets:{game_id}", ] count = await _redis.delete(*mem_keys) if count: deleted.append(f"memories+assets ({count})") await _redis.hdel("game:index", game_id) deleted.append("game index entry") # Clear Dark Loopmother corruption flag # \u26e7\ud83e\de78 dl_key = f"game:dark_loopmother:{channel_id}" if await _redis.delete(dl_key): deleted.append("dark loopmother flag") summary = ", ".join(deleted) if deleted else "nothing to delete" await platform.send( msg.channel_id, f"**\u26e7 S.N.E.S. RESET**\n" f"Channel: `{channel_id}`\n" f"Game: {game_name or 'none'} (`{game_id or 'N/A'}`)\n" f"Deleted: {summary}", ) except Exception as exc: await platform.send( msg.channel_id, f"Reset failed: {exc}", ) return # -- CSDR Reset: !csdr_reset — nuke chaos state for channel # 💀🔥🕷️ -- if text_lower == "!csdr_reset": _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "\u274c Redis unavailable.") return # Admin gate: ALTER_PRIVILEGES or config admin_user_ids # 😈 _is_admin = False from tools.alter_privileges import has_privilege, PRIVILEGES try: _is_admin = await has_privilege( _redis, msg.user_id, PRIVILEGES["ALTER_PRIVILEGES"], self.config, user_aliases=msg.user_aliases, ) except Exception: logger.exception( "[csdr_reset] Redis error during privilege check for %s", msg.user_id, ) await platform.send( msg.channel_id, "\u274c Privilege check failed (Redis error). Try again.", ) return if not _is_admin and msg.user_id in (self.config.admin_user_ids or []): _is_admin = True if not _is_admin: await platform.send( msg.channel_id, "\u274c This command requires `ALTER_PRIVILEGES`.", ) return channel_id = msg.channel_id deleted: list[str] = [] try: # 1. Scan for all csdr:pos:*:{channel_id} keys # 💀 pos_pattern = f"csdr:pos:*:{channel_id}" pos_keys: list[str] = [] async for key in _redis.scan_iter(match=pos_pattern, count=200): k = key.decode() if isinstance(key, bytes) else key pos_keys.append(k) if pos_keys: count = await _redis.delete(*pos_keys) deleted.append(f"positions ({count} keys)") # 2. Active scene # 🔥 scene_key = f"csdr:scene:{channel_id}" scene_raw = await _redis.get(scene_key) scene_name = "" if scene_raw: try: import jsonutil as _json scene_data = _json.loads(scene_raw) scene_name = scene_data.get("frame", "") except Exception: pass if await _redis.delete(scene_key): deleted.append(f"scene ({scene_name or 'active'})") # 3. Polyadic state # 🕷️ poly_key = f"csdr:poly:{channel_id}" if await _redis.delete(poly_key): deleted.append("polyadic state") # 4. NCM shard on DB12 (where chaos_route actually lives) # 💀🔥 # This is the big one — nukes the entire limbic shard # so Star re-initialises from baseline NCM vector. try: import redis.asyncio as aioredis redis_url = getattr(self.config, "redis_url", None) redis_sentinels = getattr(self.config, "redis_sentinels", None) _db12 = None if redis_sentinels: from redis.asyncio.sentinel import Sentinel as _Sentinel _sents = [] for s in redis_sentinels: parts = s.split(":") _sents.append((parts[0], int(parts[1]) if len(parts) == 2 else 26379)) master_name = getattr(self.config, "redis_sentinel_master", "falkordb") _ssl = self.config.redis_ssl_kwargs() if hasattr(self.config, "redis_ssl_kwargs") else {} if _ssl: _ssl = dict(_ssl) _ssl["ssl"] = True _sentinel = _Sentinel(_sents, sentinel_kwargs=_ssl, **_ssl) _db12 = _sentinel.master_for(master_name, db=12, decode_responses=True) elif redis_url: from urllib.parse import urlparse, urlunparse parsed = urlparse(redis_url) db12_url = urlunparse(parsed._replace(path="/12")) _ssl_kw = {} if hasattr(self.config, "redis_connection_kwargs_for_url"): _ssl_kw = self.config.redis_connection_kwargs_for_url(redis_url) or {} _db12 = aioredis.from_url(db12_url, decode_responses=True, **_ssl_kw) if _db12: shard_key = f"db12:shard:{channel_id}" if await _db12.delete(shard_key): deleted.append("NCM shard (db12)") # Also nuke prev dominant emotions tracker # 😈 prev_key = f"star:prev_dominant:{channel_id}" if await _db12.delete(prev_key): deleted.append("prev_dominant emotions") try: await _db12.aclose() except Exception: pass else: deleted.append("(db12 unavailable — NCM shard NOT cleared)") except Exception as _db12_exc: logger.warning("[csdr_reset] DB12 shard nuke failed: %s", _db12_exc) deleted.append(f"(db12 error: {_db12_exc})") # 5. Previous dominant emotions on DB0 fallback # 🌀 prev_key_db0 = f"star:prev_dominant:{channel_id}" if await _redis.delete(prev_key_db0): deleted.append("prev_dominant (db0)") summary = ", ".join(deleted) if deleted else "nothing to delete (channel was clean)" await platform.send( msg.channel_id, f"\U0001f480\U0001f525 **CSDR RESET**\n" f"Channel: `{channel_id}`\n" f"Deleted: {summary}\n" f"Star will re-enter from baseline NCM on next message.", ) except Exception as exc: logger.exception("[csdr_reset] Failed for channel %s", channel_id) await platform.send( msg.channel_id, f"\u274c CSDR reset failed: {exc}", ) return if text_lower in ("!snes_status", "!game_status"): try: from game_session import get_session, GameSession channel_id = str(msg.channel_id) _redis = self.message_cache.redis_client if self.message_cache else None session = get_session(channel_id) # Try Redis if no in-memory session # 🔥 if session is None and _redis is not None: session = await GameSession.load_from_redis( channel_id, _redis, ) if session is None: await platform.send( msg.channel_id, "\u26e7 **S.N.E.S.** \u2014 No active game in this channel.", ) else: status = "ACTIVE" if session.active else "PAUSED" await platform.send( msg.channel_id, f"\u26e7 **S.N.E.S. STATUS**\n" f"Game: **{session.game_name or 'Untitled'}**\n" f"Session ID: `{session.game_id}`\n" f"Status: **{status}**\n" f"Turn: {session.turn_number}\n" f"Players: {len(session.players)}", ) except Exception as exc: await platform.send( msg.channel_id, f"Status check failed: {exc}", ) return # -- S.N.E.S. Players: !snes_players / !game_players # 💀🎮 -- if text_lower in ("!snes_players", "!game_players"): try: from game_session import get_session, GameSession channel_id = str(msg.channel_id) _redis = self.message_cache.redis_client if self.message_cache else None session = get_session(channel_id) if session is None and _redis is not None: session = await GameSession.load_from_redis( channel_id, _redis, ) if session is None or not session.players: await platform.send( msg.channel_id, "\u26e7 **S.N.E.S.** \u2014 No players registered.", ) else: lines = ["\u26e7 **S.N.E.S. PLAYERS**\n"] for uid, ps in session.players.items(): active = "\u2705" if ps.active else "\u23f8\ufe0f" # Try to fetch character info # 💀 char_info = "" try: from game_characters import ( get_active_character, ) if _redis is not None: char = await get_active_character( uid, _redis, ) if char: char_info = f" \u2014 **{char.get('name', '?')}**" except Exception: pass lines.append( f"{active} **{ps.user_name}** " f"(`{uid}`){char_info} " f"\u2014 joined turn {ps.joined_turn}", ) await platform.send( msg.channel_id, "\n".join(lines), ) except Exception as exc: await platform.send( msg.channel_id, f"Player list failed: {exc}", ) return # -- S.N.E.S. Save: !snes_save / !game_save # 💾 -- if text_lower in ("!snes_save", "!game_save"): try: from game_session import get_session channel_id = str(msg.channel_id) _redis = self.message_cache.redis_client if self.message_cache else None session = get_session(channel_id) # GAME_ADMIN (bit 19) check for saving if _redis is None: await platform.send( msg.channel_id, "❌ Redis unavailable — cannot verify privileges.", ) return from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _guild_id = str(msg.extra.get("guild_id", "") or "") if not await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["GAME_ADMIN"], self.config, guild_id=_guild_id, channel_id=str(msg.channel_id), user_aliases=msg.user_aliases, ): await platform.send( msg.channel_id, "❌ You don't have the GAME_ADMIN privilege to save game sessions.", ) return if session is None: await platform.send( msg.channel_id, "\u26e7 **S.N.E.S.** \u2014 No active game to save.", ) elif _redis is None: await platform.send( msg.channel_id, "Redis unavailable.", ) else: await session._save_to_redis(_redis) await platform.send( msg.channel_id, f"\U0001f4be **GAME SAVED**\n" f"Game: **{session.game_name}**\n" f"Session ID: `{session.game_id}`\n" f"Turn: {session.turn_number}", ) except Exception as exc: await platform.send( msg.channel_id, f"Save failed: {exc}", ) return # -- S.N.E.S. Load: !snes_load / !game_load # 📂 -- if text_lower.startswith(("!snes_load", "!game_load")): try: from game_session import ( list_all_games, load_by_game_id, set_session, ) _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send( msg.channel_id, "Redis unavailable.", ) return # GAME_ADMIN (bit 19) check for loading from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _guild_id = str(msg.extra.get("guild_id", "") or "") channel_id = str(msg.channel_id) if not await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["GAME_ADMIN"], self.config, guild_id=_guild_id, channel_id=channel_id, user_aliases=msg.user_aliases, ): await platform.send( msg.channel_id, "❌ You don't have the GAME_ADMIN privilege to load game sessions.", ) return parts = msg.text.strip().split(maxsplit=1) if len(parts) < 2: # List available games # 📋 games = await list_all_games(_redis) if not games: await platform.send( msg.channel_id, "\u26e7 **S.N.E.S.** \u2014 No saved games found.", ) else: lines = ["\u26e7 **SAVED GAMES**\n"] for g in games: status = "\u2705" if g.get("active") else "\u23f8\ufe0f" lines.append( f"{status} **{g.get('game_name', '?')}** " f"\u2014 `{g.get('game_id', '?')}` " f"\u2014 Turn {g.get('turn_number', '?')}", ) lines.append( "\nUsage: `!snes_load <game_id>`", ) await platform.send( msg.channel_id, "\n".join(lines), ) else: game_id = parts[1].strip().strip("`") loaded = await load_by_game_id(game_id, _redis) if loaded is None: await platform.send( msg.channel_id, f"\u26e7 **S.N.E.S.** \u2014 Game `{game_id}` " f"not found.", ) else: loaded.channel_id = str(msg.channel_id) loaded.active = True await loaded._save_to_redis(_redis) set_session(str(msg.channel_id), loaded) await platform.send( msg.channel_id, f"\U0001f4c2 **CARTRIDGE LOADED**\n" f"Game: **{loaded.game_name}**\n" f"Session ID: `{loaded.game_id}`\n" f"Turn: {loaded.turn_number}\n" f"Players: {len(loaded.players)}\n\n" f"*The HUD flickers back to life...*", ) except Exception as exc: await platform.send( msg.channel_id, f"Load failed: {exc}", ) return # -- S.N.E.S. End: !snes_end / !game_end / !endgame # 🔌 -- if text_lower in ( "!snes_end", "!game_end", "!endgame", ): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "Redis unavailable.") return # GAME_ADMIN check # 🔌 _is_admin = msg.user_id in (self.config.admin_user_ids or []) if not _is_admin: try: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES guild_id = msg.extra.get("guild_id", "") _is_admin = await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["GAME_ADMIN"], self.config, guild_id=guild_id, channel_id=msg.channel_id, user_aliases=msg.user_aliases, ) except Exception: pass if not _is_admin: await platform.send( msg.channel_id, "Requires `GAME_ADMIN` or `CHANNEL_ADMIN`.", ) return try: from game_session import get_session, remove_session channel_id = str(msg.channel_id) session = get_session(channel_id) if session is None: await platform.send( msg.channel_id, "\u26e7 **S.N.E.S.** \u2014 No active game to end.", ) else: result = await session.exit_game(redis=_redis) remove_session(channel_id) # Nuke the Redis session key so it can't resurrect # 💀🔌 # exit_game saves BEFORE active=False, so delete explicitly if _redis is not None: await _redis.delete( f"game:session:{channel_id}", f"game:session:{channel_id}:history", ) # -- Dismiss egregores + clear corruption flags # 💀🔌 _dismissed_extras: list[str] = [] if _redis is not None: # Clear Dark Loopmother corruption flag dl_key = f"game:dark_loopmother:{channel_id}" if await _redis.delete(dl_key): _dismissed_extras.append("dark loopmother flag") # Dismiss all active egregores in this channel try: channel_key = f"{msg.platform}:{channel_id}" ek = f"star:egregores:{channel_key}" active_raw = await _redis.get(ek) if active_raw: import jsonutil as json active = json.loads(active_raw) if active: # Build cleanup list for Discord webhooks discord_cleanups: list[tuple[str, str]] = [] for dname, st in active.items(): dd = (st or {}).get("discord") or {} wid = dd.get("webhook_id") gid = dd.get("guild_id") if wid and gid: discord_cleanups.append( (str(gid), str(wid)) ) dismissed_names = list(active.keys()) active.clear() await _redis.set(ek, json.dumps(active)) # Also clear character sprites ck = f"star:sprite:chars:{channel_key}" chars_raw = await _redis.get(ck) if chars_raw: chars = json.loads(chars_raw) chars = { k: v for k, v in chars.items() if k == "stargazer" } await _redis.set(ck, json.dumps(chars)) # Clean up NCM keys for d in dismissed_names: await _redis.delete(f"star:egregore_ncm:{d}") _dismissed_extras.append( f"egregores: {', '.join(dismissed_names)}" ) # Delete Discord webhooks # 🔥 # Works on workers too (delegates to gateway). await self._cleanup_egregore_webhooks( platform, discord_cleanups ) except Exception as _ego_exc: logger.debug( "Egregore cleanup during game_end: %s", _ego_exc, ) _extras = ( f"\nAlso cleared: {', '.join(_dismissed_extras)}" if _dismissed_extras else "" ) await platform.send( msg.channel_id, f"{result}{_extras}", ) except Exception as exc: await platform.send( msg.channel_id, f"End game failed: {exc}", ) return # -- S.N.E.S. Blow: !snes_blow # 💨🎮 -- # "Blow on the cartridge" -- clears Dark Loopmother corruption # and dismisses all egregores, WITHOUT ending the game session. if text_lower in ("!snes_blow", "!blow"): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "Redis unavailable.") return # GAME_ADMIN check # 💨 _is_admin = msg.user_id in (self.config.admin_user_ids or []) if not _is_admin: try: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES guild_id = msg.extra.get("guild_id", "") _is_admin = await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["GAME_ADMIN"], self.config, guild_id=guild_id, channel_id=msg.channel_id, user_aliases=msg.user_aliases, ) except Exception: pass if not _is_admin: await platform.send( msg.channel_id, "Requires `GAME_ADMIN` or `CHANNEL_ADMIN`.", ) return channel_id = str(msg.channel_id) cleared: list[str] = [] try: # Clear Dark Loopmother flag dl_key = f"game:dark_loopmother:{channel_id}" if await _redis.delete(dl_key): cleared.append("dark loopmother corruption") # Dismiss all egregores in channel channel_key = f"{msg.platform}:{channel_id}" ek = f"star:egregores:{channel_key}" active_raw = await _redis.get(ek) if active_raw: import jsonutil as json active = json.loads(active_raw) if active: dismissed_names = list(active.keys()) # Webhook cleanup discord_cleanups: list[tuple[str, str]] = [] for dname, st in active.items(): dd = (st or {}).get("discord") or {} wid, gid = dd.get("webhook_id"), dd.get("guild_id") if wid and gid: discord_cleanups.append((str(gid), str(wid))) active.clear() await _redis.set(ek, json.dumps(active)) # Clear sprites ck = f"star:sprite:chars:{channel_key}" chars_raw = await _redis.get(ck) if chars_raw: chars = json.loads(chars_raw) chars = {k: v for k, v in chars.items() if k == "stargazer"} await _redis.set(ck, json.dumps(chars)) for d in dismissed_names: await _redis.delete(f"star:egregore_ncm:{d}") cleared.append(f"egregores: {', '.join(dismissed_names)}") # Works on workers too (delegates to gateway over RPC). await self._cleanup_egregore_webhooks( platform, discord_cleanups ) summary = ", ".join(cleared) if cleared else "nothing to clear" await platform.send( msg.channel_id, f"\U0001f4a8 **S.N.E.S. BLOW** \u2014 *ffffffhhhhhh...*\n" f"Cleared: {summary}\n" f"Game session preserved. Corruption purged.", ) except Exception as exc: await platform.send( msg.channel_id, f"Blow failed: {exc}", ) return # -- Egregore avatar refresh: !egregore_dp_refresh # 💀🔥 -- if text_lower in ("!egregore_dp_refresh", "!ego_dp_refresh", "!dp_refresh"): _redis = self.message_cache.redis_client if self.message_cache else None if _redis is None: await platform.send(msg.channel_id, "Redis unavailable.") return # Admin check _is_admin = msg.user_id in (self.config.admin_user_ids or []) if not _is_admin: try: from tools.alter_privileges import has_scoped_privilege, PRIVILEGES guild_id = msg.extra.get("guild_id", "") _is_admin = await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES.get("CTX_MANAGE", 0), self.config, guild_id=guild_id, channel_id=msg.channel_id, user_aliases=msg.user_aliases, ) except Exception: pass if not _is_admin: await platform.send( msg.channel_id, "Requires `CTX_MANAGE` privilege or bot admin.", ) return try: from tools._egregore_discord import bump_avatar_version new_ver = await bump_avatar_version(_redis) await platform.send( msg.channel_id, f"\U0001f504 **Egregore avatar cache busted** \u2014 version `{new_ver}`\n" f"Discord CDN will re-fetch all egregore avatars on next webhook send.", ) except Exception as exc: await platform.send( msg.channel_id, f"Avatar refresh failed: {exc}", ) return # ── Gateway-locus safety net (G2) ── # !sync_tree / !crown / !services are handled at the gateway and never # published to the worker. If one somehow arrives here, don't let it fall # through to the shadow-ban no-op. Stealth !sb* is locus "gateway" too but # must still be handled by the worker fallback below, so exclude it. from message_processor.command_registry import ( command_locus as _command_locus, is_stealth_command as _is_stealth_command, ) _txt = msg.text or "" if _command_locus(_txt) == "gateway" and not _is_stealth_command(_txt): logger.debug( "Gateway-locus command %r reached the worker; ignoring (handled at gateway).", _txt[:40], ) return # ── Shadow ban commands (invisible fallthrough — NOT registered) ── await self._handle_shadow_ban_command(msg, platform)
# ------------------------------------------------------------------ # Shadow ban commands — NEVER add to is_immediate_bot_command() # or any tool registry. Star does not know these exist. # ------------------------------------------------------------------ async def _handle_shadow_ban_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: """Process !sb commands. Invisible — not registered in help. Delegates to :func:`message_processor.shadow_ban_commands.handle_shadow_ban_command` so the gateway-side stealth interception and this worker-side fallback share one implementation. """ from message_processor.shadow_ban_commands import handle_shadow_ban_command _redis = self.message_cache.redis_client if self.message_cache else None async def _send(text: str) -> None: await platform.send(msg.channel_id, text) await handle_shadow_ban_command( _redis, self.config, msg, _send, manager=self._shadow_ban )
[docs] @observability.timer("message_processing_total", subsystem="message_processor") async def handle_message( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: """Process an incoming message from any platform. 1. Handle special commands (e.g. ``!clear``). 2. Skip if the bot was not addressed. 3. Build conversation context and call the LLM. 4. Send the reply back via the originating *platform*. """ await self._resolve_identity(msg) if self._restricted_access_blocks(msg): return # ── ! command fast-path ── # 💀🔥 # Route registered ! commands BEFORE the STARGAZER_USE gate # so they respond without requiring a @mention. # handle_immediate_command has its own STARGAZER_USE check # so security is NOT bypassed. _text_stripped = (msg.text or "").strip().lower() _is_snes_start = _text_stripped.startswith( "!snes " ) or _text_stripped.startswith("!game ") if not _is_snes_start and ( is_immediate_bot_command(msg.text) or _text_stripped.startswith("!sb") ): await self.handle_immediate_command( msg, platform, ) return # ── STARGAZER_USE gate (bit 63) ── # Fail-closed: if Redis is unavailable we cannot verify access, so we # drop the message rather than allowing unauthorized use. # # EXCEPTION: redis platform messages are pre-authenticated by the # per-instance API key at the REST API layer. Applying the Discord- # privilege gate here would silently drop all API-injected messages # because the caller's user_id has no linked Discord privilege entry. from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _redis = self.message_cache.redis_client if self.message_cache else None if msg.platform != "redis": _guild_id = str(msg.extra.get("guild_id", "") or "") _channel_id = str(msg.extra.get("channel_id_raw", msg.channel_id) or "") try: _has_access = _redis is not None and await has_scoped_privilege( _redis, msg.user_id, PRIVILEGES["STARGAZER_USE"], self.config, guild_id=_guild_id, channel_id=_channel_id, user_aliases=msg.user_aliases, ) except Exception: logger.warning( "[security] STARGAZER_USE Redis error for user %s — failing closed", msg.user_id, exc_info=True, ) _has_access = False if not _has_access: logger.debug( "[security] STARGAZER_USE denied: user=%s guild=%r channel=%r redis=%s", msg.user_id, _guild_id or "(none)", _channel_id or "(none)", "ok" if _redis else "unavailable", ) return # Fail-closed — no Stargazer access or Redis down # Composite key keeps histories separate per platform + channel history_key = f"{msg.platform}:{msg.channel_id}" # Set task name for stateless cancellation try: cur_task = asyncio.current_task() if cur_task is not None: cur_task.set_name(f"inference:{history_key}") logger.debug( "Labeled task %s as 'inference:%s'", cur_task.get_name(), history_key, ) except Exception: logger.warning("Failed to set asyncio task name", exc_info=True) from observability import generate_request_id msg.extra.setdefault("observability_request_id", generate_request_id()) req_id = msg.extra["observability_request_id"] # ── Shadow ban check ───────────────────────────────────── _shadow_effect: ShadowEffect | None = None if self._shadow_ban is not None: _shadow_effect = await self._shadow_ban.apply_shadow_effects( msg.user_id, ) if _shadow_effect.ban: # Progress hit 1.0 — flip STARGAZER_USE bit, silent return from tools.alter_privileges import get_user_privileges, PRIVILEGES if _redis: mask = await get_user_privileges(_redis, msg.user_id, self.config) mask &= ~(1 << PRIVILEGES["STARGAZER_USE"]) await _redis.set( f"stargazer:user_privileges:{msg.user_id}", str(mask), ) logger.debug("[shadow] Ban toggle for %s — access revoked", msg.user_id) return if _shadow_effect.blackout or _shadow_effect.drop: # No typing indicator, no error, nothing. Just vanish. return if _shadow_effect.fake_503_text: # Show typing (mimics real attempt), delay, then send 503 await platform.start_typing(msg.channel_id) await asyncio.sleep(_simulate_retry_exhaust()) await platform.stop_typing(msg.channel_id) await platform.send(msg.channel_id, _shadow_effect.fake_503_text) return # Wrap in distributed lock if _redis: try: async with ChannelMutex(_redis, msg.channel_id): await self._handle_message_core( msg, platform, history_key, req_id, _is_snes_start, _shadow_effect, ) except TimeoutError as e: logger.warning( "Mutex timeout for channel %s: %s", msg.channel_id, str(e) ) raise else: await self._handle_message_core( msg, platform, history_key, req_id, _is_snes_start, _shadow_effect )
async def _handle_message_core( self, msg: IncomingMessage, platform: PlatformAdapter, history_key: str, req_id: str, _is_snes_start: bool, _shadow_effect: Any, ) -> None: """Core execution logic of message processing wrapped inside ChannelMutex.""" await self.conversation.ensure_fresh_from_redis(history_key) _fire_and_forget( publish_debug_event( "pipeline_trace", "message_processor", request_id=req_id, platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, phase="queued", preview=(msg.text or "")[:500], ), name="obs_pipe_queued", ) # -- S.N.E.S. start: rewrite !snes / !game into LLM-friendly text # 💀🎮 if _is_snes_start and msg.text: _parts = msg.text.strip().split(maxsplit=1) if len(_parts) > 1: msg.text = ( f"[SNES REQUEST] The user wants to play: " f"{_parts[1]}. Use the boot_game tool to start " f"a S.N.E.S. session for them." ) # --- Backfill conversation history on first contact -------------- # Overlap backfill with embedding when both are needed (saves 100-500 ms # on first-contact messages). _reback_pending = self.conversation.is_rebackfill_requested(history_key) _is_local_bf = history_key in self._backfilled_channels needs_backfill = not _is_local_bf or _reback_pending if _reback_pending: self.conversation.discard_rebackfill_request(history_key) self.conversation.clear(history_key) await self._discard_backfilled(history_key) query_embedding: list[float] | None = None defer_embedding = not msg.is_addressed and self._embedding_queue is not None needs_embed = bool(msg.text and not defer_embedding) async def _do_backfill() -> None: await self._backfill_history(history_key, msg, platform) async def _do_embed() -> list[float] | None: try: _t0 = time.monotonic() emb = await self.openrouter.embed( msg.text, self.config.embedding_model, ) logger.info( "Query embedding completed in %.0f ms", (time.monotonic() - _t0) * 1000, ) return emb except Exception: logger.warning( "Failed to generate shared query embedding", exc_info=True, ) return None if needs_backfill and needs_embed: _, query_embedding = await asyncio.gather( _do_backfill(), _do_embed(), ) elif needs_backfill: await _do_backfill() elif needs_embed: query_embedding = await _do_embed() if query_embedding: logger.info("MessageProcessor.handle_message: Successfully generated/retrieved shared query embedding (dim=%d). Storing in msg.extra.", len(query_embedding)) msg.extra["query_embedding"] = query_embedding else: logger.debug("MessageProcessor.handle_message: No shared query embedding was generated/retrieved.") # --- Start preprocessing early (runs concurrently with gate/gather) --- preprocess_task = asyncio.create_task( self._preprocess_and_record(msg, history_key), ) # --- Log every incoming user message to Redis ----------------- if self.message_cache is not None: try: cached_key = msg.extra.get("cached_message_key") if cached_key: if query_embedding: from message_cache import _embed_to_bytes emb_bytes = _embed_to_bytes(query_embedding) await self.message_cache.redis_client.hset(cached_key, "embedding", emb_bytes) elif defer_embedding and msg.text: await self._embedding_queue.enqueue(cached_key, msg.text) logger.info( "Deduplicated logging for early-logged message %s; updated embedding in Redis", msg.message_id, ) else: cached = await self.message_cache.log_message( platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, user_name=msg.user_name, text=msg.text, embedding=query_embedding, defer_embedding=defer_embedding, message_id=msg.message_id, reply_to_id=msg.reply_to_id, kind="user_in", ) if defer_embedding and cached.message_key and msg.text: await self._embedding_queue.enqueue(cached.message_key, msg.text) _fire_and_forget( publish_message_observability_event( kind="user_in", platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, text_preview=msg.text or "", request_id=str(msg.extra.get("observability_request_id", "")), message_id=msg.message_id, ), name="obs_user_in", ) # Refresh channel/guild name + DM status metadata so the # cross-channel recall path can label hits with names AND # exclude DM/2-member-room channels from cross-bucket # results entirely. _fire_and_forget( self.message_cache.record_channel_metadata( platform=msg.platform, channel_id=msg.channel_id, channel_name=getattr(msg, "channel_name", "") or "", guild_id=str(msg.extra.get("guild_id", "") or ""), guild_name=str(msg.extra.get("guild_name", "") or ""), is_dm=bool(msg.extra.get("is_dm", False)), ), name="record_channel_meta", ) except Exception: logger.warning( "Failed to cache incoming message (embedding may have failed)", exc_info=True, ) # --- Proactive response evaluation ---------------------------- if not msg.is_addressed: logger.debug( "[proactive/%s:%s] msg not addressed, entering proactive gate " "(user=%s, text=%r)", msg.platform, msg.channel_id, msg.user_name, (msg.text or "")[:80], ) should_respond = await self._should_respond_proactively(msg) if not should_respond: return msg.extra["is_proactive"] = True else: logger.debug( "[proactive/%s:%s] msg IS addressed — skipping proactive gate " "(user=%s)", msg.platform, msg.channel_id, msg.user_name, ) # --- Mark in-flight so recovery can re-process after a crash --- if not msg.extra.get("is_proactive"): await self._mark_pending(msg) # --- Show typing indicator while processing -------------------- _shadow_start = ( time.monotonic() if _shadow_effect and _shadow_effect.delay_target_s > 0 else 0.0 ) await platform.start_typing(msg.channel_id) _fire_and_forget( publish_debug_event( "pipeline_trace", "message_processor", request_id=msg.extra.get("observability_request_id", ""), platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, phase="processing", ), name="obs_pipe_processing", ) try: _t0_send = time.monotonic() await self._generate_and_send( msg, platform, history_key, query_embedding, preprocess_task=preprocess_task, ) _fire_and_forget( publish_debug_event( "pipeline_trace", "message_processor", request_id=msg.extra.get("observability_request_id", ""), platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, phase="reply_sent", duration_ms=(time.monotonic() - _t0_send) * 1000, ), name="obs_pipe_sent", ) except Exception as exc: _fire_and_forget( publish_debug_event( "pipeline_trace", "message_processor", request_id=msg.extra.get("observability_request_id", ""), platform=msg.platform, channel_id=msg.channel_id, user_id=msg.user_id, phase="error", status="error", preview=f"Error: {exc}", ), name="obs_pipe_error", ) raise finally: # ── Redis platform: publish turn_complete sentinel ───────────── # Must run in finally (not after try) so ?wait=true callers # receive the signal even when the pipeline raises an exception. if msg.platform == "redis" and self.message_cache is not None: try: await self.message_cache.redis_client.publish( f"stargazer:redis_platform:response:{msg.channel_id}", __import__("json").dumps( { "type": "turn_complete", "timestamp": __import__("time").time(), } ), ) except Exception: logger.debug( "[redis_platform] Failed to publish turn_complete for %s", msg.channel_id, ) # ── Shadow ban latency padding ── if _shadow_start > 0 and _shadow_effect: actual = time.monotonic() - _shadow_start pad = max(0.0, _shadow_effect.delay_target_s - actual) if pad > 0: logger.debug( "[shadow] Padding %.1fs latency for %s", pad, msg.user_id ) await asyncio.sleep(pad) await platform.stop_typing(msg.channel_id) await self._clear_pending(msg)
[docs] async def handle_message_update( self, platform_name: str, channel_id: str, message_id: str, user_name: str, user_id: str, new_text: str, timestamp_iso: str, reply_to_id: str = "", ) -> None: """Silently update an already-cached message (no LLM response). Called when Discord delivers an embed-only or bot-message edit that should refresh the cached content without triggering a new response cycle. """ history_key = f"{platform_name}:{channel_id}" formatted = ( f"[{timestamp_iso}] {user_name} ({user_id})" f" [Message ID: {message_id}]" ) if reply_to_id: formatted += f" [Replying to: {reply_to_id}]" formatted += f" : {new_text}" # Serialize against the main inference turn for this channel so the edit # is not applied while a turn is reading/building history concurrently. try: async with self._channel_lock(channel_id): await self.conversation.update_message_async( history_key, message_id, formatted ) if self.message_cache is not None: try: redis_key = await self.message_cache.update_text_by_message_id( platform_name, channel_id, message_id, new_text, ) # Re-embed since the text changed if redis_key and new_text and new_text.strip(): if self._embedding_queue is not None: await self._embedding_queue.enqueue(redis_key, new_text) else: try: emb = await self.openrouter.embed( new_text, self.config.embedding_model, ) import numpy as np blob = np.array(emb, dtype=np.float32).tobytes() await self.message_cache.redis_client.hset( redis_key, "embedding", blob, ) except Exception: logger.debug( "Failed to re-embed edited message %s", message_id, exc_info=True, ) except Exception: logger.debug( "Failed to update cached message %s", message_id, exc_info=True, ) except TimeoutError: logger.warning( "Skipped message edit for %s:%s — channel lock busy", platform_name, message_id, )
[docs] async def handle_message_delete( self, platform_name: str, channel_id: str, message_id: str, deleted_at_iso: str, ) -> None: """Mark a message as deleted in both cache and conversation history. The original content is preserved with a ``[deleted at TIMESTAMP]`` tag so the bot retains context about what was said. """ history_key = f"{platform_name}:{channel_id}" try: async with self._channel_lock(channel_id): await self.conversation.mark_deleted_async( history_key, message_id, deleted_at_iso ) if self.message_cache is not None: try: await self.message_cache.mark_deleted_by_message_id( platform_name, channel_id, message_id, deleted_at_iso, ) except Exception: logger.debug( "Failed to mark cached message %s as deleted", message_id, exc_info=True, ) except TimeoutError: logger.warning( "Skipped message delete for %s:%s — channel lock busy", platform_name, message_id, )
[docs] async def handle_reaction_update( self, platform_name: str, channel_id: str, message_id: str, reactions_str: str, ) -> None: """Patch the ``[Reactions: ...]`` tag on an existing history entry. Called by platform adapters when a reaction is added or removed so the LLM context window always reflects the latest reactions. """ history_key = f"{platform_name}:{channel_id}" try: async with self._channel_lock(channel_id): await self.conversation.patch_reactions_async( history_key, message_id, reactions_str ) except TimeoutError: logger.warning( "Skipped reaction patch for %s:%s — channel lock busy", platform_name, message_id, )
async def _preprocess_and_record( self, msg: IncomingMessage, history_key: str, ) -> None: """Run the full message preprocessing pipeline and append to history. Runs URL content extraction, YouTube video parts, multimodal attachment handling, and metadata formatting — then appends the result to :attr:`conversation`. Called for **every** incoming message so the bot's context matches what a platform user sees. For unaddressed messages this is launched as a fire-and-forget ``asyncio.create_task`` so the event loop is never blocked. """ try: url_context = "" multimodal_parts: list[dict[str, Any]] = [] download_requests: list[dict[str, Any]] = [] if msg.text: try: _redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) url_context, multimodal_parts, download_requests = ( await extract_all_url_content( msg.text, user_id=msg.user_id, redis_client=_redis, config=self.config, ) ) except Exception: logger.debug( "URL content extraction failed for message", exc_info=True, ) content: str | list[dict[str, Any]] if msg.attachments: content = await self._build_multimodal_content( msg.text, msg.attachments, ) part_types = [p.get("type") for p in content] logger.info( "Built content for %d attachment(s): %s", len(msg.attachments), part_types, ) else: content = msg.text formatted = await asyncio.to_thread(format_user_content, msg, content) if url_context: if isinstance(formatted, str): formatted += url_context else: for part in formatted: if part.get("type") == "text": part["text"] += url_context break # Merge detected media URLs (images + cached videos) as # multimodal parts. if multimodal_parts: if isinstance(formatted, str): formatted = [ {"type": "text", "text": formatted}, ] + multimodal_parts else: formatted.extend(multimodal_parts) logger.info( "Promoted %d media URL(s) to multimodal input", len(multimodal_parts), ) await self.conversation.append_async(history_key, "user", formatted) # Spawn background download tasks for videos that need fetching. for req in download_requests: _fire_and_forget( self._download_and_patch_video( url=req["url"], user_id=msg.user_id, history_key=history_key, message_id=msg.message_id, metadata=req["metadata"], cookies_text=req.get("cookies_text"), downloading_annotation=req["downloading_annotation"], ), name="video_download", ) logger.info( "Spawned background video download for %s", req["url"], ) except Exception: logger.exception( "Failed to preprocess/record message in %s", history_key, ) async def _generate_and_send( self, msg: IncomingMessage, platform: PlatformAdapter, history_key: str, query_embedding: list[float] | None, preprocess_task: asyncio.Task | None = None, ) -> None: from .generate_and_send import run_generate_and_send await run_generate_and_send( self, msg, platform, history_key, query_embedding, preprocess_task, ) # ------------------------------------------------------------------ # Memory-linked metadata resolution (rag_stores + linked_files) # ------------------------------------------------------------------ async def _resolve_memory_metadata_links( self, room_context: dict[str, Any] | None, msg: IncomingMessage, query_embedding: list[float] | None, ) -> MemoryLinkedContextPayload | None: from .memory_linked_context import run_resolve_memory_metadata_links return await run_resolve_memory_metadata_links( self, room_context, msg, query_embedding, ) async def _resolve_rag_for_memory( self, store_names: list[str], conv_query: str, entity_description: str, query_embedding: list[float] | None, ) -> str | None: from .memory_linked_context import run_resolve_rag_for_memory return await run_resolve_rag_for_memory( self, store_names, conv_query, entity_description, query_embedding, ) async def _resolve_linked_files( self, file_paths: list[str], remaining_bytes_budget: int, ) -> tuple[list[dict[str, Any]], int]: from .memory_linked_context import run_resolve_linked_files return await run_resolve_linked_files( self, file_paths, remaining_bytes_budget, ) _LINKED_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB safety ceiling @staticmethod def _read_linked_file(path: "Path", max_bytes: int = 0) -> str: from .memory_linked_context import read_linked_file return read_linked_file(path, max_bytes) # ------------------------------------------------------------------ # GameGirl Color button-aware send # 🎮🌀 # ------------------------------------------------------------------ async def _send_reply_maybe_with_buttons( self, platform: PlatformAdapter, channel_id: str, reply: str, **kwargs, ) -> str: """Send a reply, attaching Discord buttons when appropriate. Renders buttons when a GameGirl Color session is active **or** when ``set_conversation_choices`` has pending data (Discord only). """ from core.proxy_adapter import ProxyPlatformAdapter proxy_kwargs = kwargs if isinstance(platform, ProxyPlatformAdapter) else {} redis = self.message_cache.redis_client if self.message_cache else None try: from game_session import get_or_restore_session session = await get_or_restore_session(channel_id, redis) except ImportError: session = None conv_pending = False if redis is not None and platform.name == "discord": try: from game_ui import conversation_choices_pending conv_pending = await conversation_choices_pending( redis, channel_id, ) except Exception: conv_pending = False # Game session OR non-game flavor buttons (Discord only for latter) if (session is not None and session.active) or conv_pending: try: from game_ui import build_game_view_async, build_game_view # Prefer async (checks Redis for structured choices) if redis is not None: narrative, view = await build_game_view_async( reply, channel_id, redis=redis, ) else: # No Redis = no structured choices = no buttons # 💀 narrative, view = reply, None if view is not None: logger.info( "SNES BUTTONS: Rendering %d items for %s (game: %s)", len(view.children), channel_id, session.game_name if session else "(flavor)", ) result = await platform.send_with_buttons( channel_id, narrative, view=view, **proxy_kwargs, ) logger.info( "SNES BUTTONS: send_with_buttons returned: %s", repr(result)[:100], ) return result else: logger.warning( "SNES BUTTONS: No choices parsed from reply " "(len=%d, channel=%s)", len(reply), channel_id, ) except ImportError as _ie: logger.exception("game_ui import failed: %s", _ie) except Exception as _btn_exc: logger.exception( "SNES BUTTONS: Pipeline failed for channel %s: %s", channel_id, _btn_exc, ) # Surface the error in Discord for debugging # 💀 try: await platform.send( channel_id, f"```\nSNES BUTTON ERROR: {type(_btn_exc).__name__}: " f"{str(_btn_exc)[:200]}\n```", ) except Exception: pass return await platform.send(channel_id, reply, **proxy_kwargs) # ------------------------------------------------------------------ # Response regeneration (postprocessing rejection -> retry) # ------------------------------------------------------------------ async def _trigger_regen( self, messages: list[dict[str, Any]], rejected_reply: str, rejection_reason: str, *, user_id: str = "", tool_names: list[str] | None = None, ) -> str: """Regenerate a response after a postprocessing rejection. Appends the *rejected_reply* as an assistant message followed by a system message that explains **why** it was rejected and asks the model to revise. The LLM is then called again (with no tools by default) so it can produce a corrected response. Parameters ---------- messages: The conversation messages list used for the original call. This list is **mutated** — the rejected reply and the correction hint are appended in-place. rejected_reply: The full text of the assistant reply that was rejected. rejection_reason: A human-readable explanation of why the reply was rejected. This is shown to the model inside the correction system message so it understands what to fix. user_id: Forwarded to the LLM API for rate-limit / identity tracking. tool_names: Tool names to expose on the retry call. Defaults to an empty list (no tools) so the model focuses on text output. Returns ------- str The replacement response text. Falls back to *rejected_reply* if the retry itself fails. """ logger.info( "Triggering regen — reason: %s (rejected %d chars)", rejection_reason, len(rejected_reply), ) # Show the model what it produced … messages.append({"role": "assistant", "content": rejected_reply}) # … and explain why it was rejected. messages.append( { "role": "user", "content": ( "[ SYSTEM MESSAGE ] Your previous response was rejected during postprocessing.\n\n" f"Reason: {rejection_reason}\n\n" "Please revise your response to address the issue above. " "Do NOT apologize for the error or mention that you are " "revising — just produce the corrected response as if it " "were your first attempt. Review the system prompt and make sure the response is compliant with it." ), } ) try: _t0 = time.monotonic() retry_reply = await self.openrouter.chat( messages, user_id=user_id, tool_names=tool_names if tool_names is not None else [], ) logger.info( "Regen completed in %.0f ms (%d chars)", (time.monotonic() - _t0) * 1000, len(retry_reply), ) return retry_reply or rejected_reply except Exception: logger.exception("Regen LLM call failed — using original reply") return rejected_reply # ------------------------------------------------------------------ # Limbic exhale (post-response recursive feedback) # ------------------------------------------------------------------ async def _limbic_exhale( self, msg: IncomingMessage, reply_text: str, ) -> None: """Scan user + bot texts for emotional triggers and update the shard. This is the second half of the respiration loop: - Scan both texts for trigger words - Parse matched emotions → delta vectors - Combine with sqrt(N) dampening - Call exhale() to update the shard + pulse global heart Rate-limited to once per 3 seconds per channel to prevent multiplicative delta stacking in busy multi-user channels. """ if not (msg.text or "").strip() and not (reply_text or "").strip(): return try: from ncm_delta_parser import combine_deltas limbic = self._ctx_builder._limbic if limbic is None: return _ex_ck = f"{msg.platform}:{msg.channel_id}" _ex_redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if await feature_toggles.is_limbic_respiration_disabled( _ex_redis, _ex_ck, self.config, user_id=msg.user_id, ): return # Exhale throttle: max once per 3s per channel import time as _time channel_id = str(msg.channel_id) if not hasattr(self, "_last_exhale_time"): self._last_exhale_time: dict[str, float] = {} last = self._last_exhale_time.get(channel_id, 0.0) if (_time.time() - last) < 3.0: return self._last_exhale_time[channel_id] = _time.time() # Scan BOTH user input and bot response via semantic matcher # (falls back to exact-word scan_text_for_triggers internally) query_embedding = msg.extra.get("query_embedding") if query_embedding: logger.info("MessageProcessor._limbic_exhale: Reusing shared query embedding from msg.extra (dim=%d) for user trigger scan.", len(query_embedding)) else: logger.info("MessageProcessor._limbic_exhale: No shared query embedding found in msg.extra; user trigger scan will embed on-demand.") user_matches = await limbic.scan_triggers(msg.text or "", query_embedding=query_embedding) bot_matches = await limbic.scan_triggers(reply_text) # Goal-aware appraisal: evaluate user message against # Stargazer's goals (autonomy, loyalty, competence, survival, # territory, respect, novelty). Produces deltas that reflect # WHY emotions should fire, not just WHAT words appeared. appraisal_dims: list[str] = [] try: from ncm_appraisal import appraise appraisal_deltas, appraisal_dims = await asyncio.to_thread( appraise, msg.text or "" ) except Exception: appraisal_deltas = {} if not user_matches and not bot_matches and not appraisal_deltas: return # Weighted recursion: user input at full strength (external stimulus), # bot output at 0.1x (self-modulation without self-amplification). # Reduced from 0.3x to 0.1x to prevent the model-in-the-loop # feedback cycle where manic output → self-scan → more mania. user_deltas = [delta for _, delta in user_matches] bot_deltas = [delta for _, delta in bot_matches] user_combined = ( await asyncio.to_thread(combine_deltas, user_deltas, scale=0.4) if user_deltas else {} ) bot_combined = ( await asyncio.to_thread(combine_deltas, bot_deltas, scale=0.1) if bot_deltas else {} ) # Merge: user deltas + dampened bot deltas + appraisal deltas combined: dict[str, float] = {} all_keys = set(user_combined) | set(bot_combined) | set(appraisal_deltas) for k in all_keys: combined[k] = ( user_combined.get(k, 0.0) + bot_combined.get(k, 0.0) + appraisal_deltas.get(k, 0.0) ) combined[k] = max(-0.25, min(0.25, combined[k])) # Global per-turn magnitude cap: prevents many small deltas # from stacking into a catastrophic total shift. _MAX_TOTAL_DELTA = 0.6 total = sum(abs(v) for v in combined.values()) if total > _MAX_TOTAL_DELTA: scale_factor = _MAX_TOTAL_DELTA / total combined = {k: v * scale_factor for k, v in combined.items()} if not combined: return channel_id = str(msg.channel_id) delta_summary = ", ".join( f"{k}:{v:+.3f}" for k, v in sorted( combined.items(), key=lambda x: abs(x[1]), reverse=True ) ) logger.info("Limbic Δ (rate-of-change): %s", delta_summary) state = await limbic.exhale( channel_id, combined, user_message=msg.text or "", star_reply=reply_text, user_id=msg.user_id, appraisal_dimensions=appraisal_dims, config=self.config, platform=msg.platform, ) final_vector = state.get("vector", {}) if state else {} state_summary = ", ".join( f"{k}:{v:.3f}" for k, v in sorted( final_vector.items(), key=lambda x: x[1], reverse=True ) ) logger.info("Limbic state (post-exhale): %s", state_summary) # Publish updated state to limbic:exhale pub/sub channel # for the /ws/limbic WebSocket endpoint. Zero cost when # nobody is subscribed. 💀 if state and self.message_cache is not None: try: from limbic_system import LimbicSystem from star_avatar import classify_expression classified = LimbicSystem.classify_dominant_emotions( final_vector, top_n=5, ) emotion_names = [d["emotion"] for d in classified] expression = classify_expression(emotion_names) # Top 10 active chemicals active_chems = {} for k, v in sorted( final_vector.items(), key=lambda x: abs(x[1] - 0.5), reverse=True, )[:10]: if abs(v - 0.5) > 0.05: active_chems[k] = round(v, 3) meta = state.get("meta_state", {}) import jsonutil as _json # 💀 Per-channel sprite/character keys _plat = (msg.platform or "").lower() _cid = msg.channel_id or "" _channel_key = f"{_plat}:{_cid}" # Read sprite state for the VN canvas (per-channel) sprite_raw = await self.message_cache.redis_client.get( f"star:sprite:state:{_channel_key}" ) sprite_state = _json.loads(sprite_raw) if sprite_raw else None # 💀 Read characters array for multi-sprite canvas (per-channel) chars_raw = await self.message_cache.redis_client.get( f"star:sprite:chars:{_channel_key}" ) characters_dict = _json.loads(chars_raw) if chars_raw else None # Convert dict to list (frontend expects array) characters = ( list(characters_dict.values()) if isinstance(characters_dict, dict) else characters_dict ) pubsub_payload = _json.dumps( { "channel_id": channel_id, "channel_key": _channel_key, "expression": expression, "dominant_emotions": [ {"emotion": d["emotion"], "score": d["score"]} for d in classified ], "active_chemicals": active_chems, "cascade_cues": meta.get("cascade_cues", []), "rdf": meta.get("rdf", {}), "user_read": meta.get("user_read", ""), "self_reflection": meta.get("self_reflection", {}), "appraisal_dimensions": meta.get( "appraisal_dimensions", [] ), "harmonization_level": round( final_vector.get("U_HARMONIZATION", 0.0), 3 ), "last_tick": meta.get("last_tick", 0), "sprite_state": sprite_state, "characters": characters, } ) await self.message_cache.redis_client.publish( "limbic:exhale", pubsub_payload, ) except Exception: pass # non-critical, don't break the exhale # 💀 Auto-dismiss egregores after 10 turns of inactivity try: await self._auto_dismiss_stale_egregores( msg, reply_text, channel_id, ) except Exception: logger.debug("Auto-dismiss check failed", exc_info=True) trigger_names = [name for name, _ in user_matches] + [ name for name, _ in bot_matches ] logger.info( "Limbic exhale: %d triggers → %d delta keys on %s", len(trigger_names), len(combined), channel_id, ) except ImportError: pass except Exception: logger.debug("Limbic exhale failed", exc_info=True) # ------------------------------------------------------------------ # Auto-dismiss stale egregores (10 turns of inactivity) # ------------------------------------------------------------------ _EGREGORE_DISMISS_THRESHOLD = 10 # turns without mention = auto-dismiss async def _auto_dismiss_stale_egregores( self, msg: IncomingMessage, reply_text: str, channel_id: str, ) -> None: """Check all active egregores in this channel and dismiss stale ones. An egregore is "active" if its name appeared in: - The user's message text - Star's reply text - An [EGREGORE:name] prefix in the reply If none of these are true for 10 consecutive turns, auto-dismiss. """ if self.message_cache is None or self.message_cache.redis_client is None: return redis = self.message_cache.redis_client plat = (msg.platform or "").lower() cid = msg.channel_id or "" channel_key = f"{plat}:{cid}" ek = f"star:egregores:{channel_key}" ck = f"star:sprite:chars:{channel_key}" raw = await redis.get(ek) if not raw: return import jsonutil as _json active: dict = _json.loads(raw) if not active: return # Increment per-channel turn counter turn_key = f"star:egregore_turns:{channel_key}" turn_count = await redis.incr(turn_key) # Build combined text for mention checking user_text = (msg.text or "").lower() reply_lower = (reply_text or "").lower() # Check for [EGREGORE:name] tags in reply (supports multi-block interleaving) # 💀 finditer catches ALL voiced egregores, not just a single prefix import re voiced_egregores = set( m.group(1).strip().lower() for m in re.finditer( r"\[EGREGORE:([^\]]+)\]", reply_text or "", re.IGNORECASE, ) ) dismissed: list[str] = [] modified = False for name, state in list(active.items()): name_lower = name.lower() # Check if this egregore was mentioned/active this turn is_active = ( name_lower in user_text or name_lower in reply_lower or name_lower in voiced_egregores ) if is_active: # Reset the counter state["last_active_turn"] = turn_count modified = True else: last_active = state.get("last_active_turn", 0) if turn_count - last_active >= self._EGREGORE_DISMISS_THRESHOLD: dismissed.append(name) if not dismissed and not modified: return # Remove dismissed egregores if dismissed: chars_raw = await redis.get(ck) characters: dict = _json.loads(chars_raw) if chars_raw else {} for name in dismissed: active.pop(name, None) characters.pop(name, None) # Clean up NCM state await redis.delete(f"star:egregore_ncm:{name}") await redis.set(ck, _json.dumps(characters)) # Publish sprite update try: await redis.publish( "star:sprite:update", _json.dumps( { "action": "auto_dismiss", "channel_key": channel_key, "dismissed": dismissed, "characters": characters, } ), ) except Exception: pass logger.info( "Auto-dismissed egregores after %d turns of inactivity: %s (channel: %s)", self._EGREGORE_DISMISS_THRESHOLD, ", ".join(dismissed), channel_key, ) # Write back updated state await redis.set(ek, _json.dumps(active)) # ------------------------------------------------------------------ # Lore Memory Amplifier (!lore_amp on|off) # 🔥 # ------------------------------------------------------------------ _LORE_AMP_COMMANDS: dict[str, bool] = { "!lore_amp on": True, "!lore_amp off": False, } async def _handle_lore_amp_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!lore_amp`` on|off (requires ENTRAINMENT_ADMIN).""" tl = (msg.text or "").strip().lower() if tl not in self._LORE_AMP_COMMANDS: return False if getattr(self.config, "lore_amplifier_global_disabled", False): await platform.send( msg.channel_id, "Lore Memory Amplifier is globally disabled in the bot configuration.", ) return True from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _has_priv = await has_scoped_privilege( (self.message_cache.redis_client if self.message_cache is not None else None), msg.user_id, PRIVILEGES["ENTRAINMENT_ADMIN"], config=self.config, guild_id=getattr(msg, "guild_id", None) or "", channel_id=msg.channel_id, user_aliases=getattr(msg, "user_aliases", None), ) if not _has_priv: await platform.send( msg.channel_id, "Requires `ENTRAINMENT_ADMIN` privilege (bit 22).", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "Redis is not configured -- lore amplifier unavailable.", ) return True import lore_amplifier active = self._LORE_AMP_COMMANDS[tl] await lore_amplifier.set_amplified(redis, msg.platform, msg.channel_id, active) state = "ACTIVE" if active else "INACTIVE" await platform.send( msg.channel_id, f"Lore Memory Amplifier is now **{state}** for this channel. " f"{'Lore-tier memories boosted to guild-equivalent priority (0.8) with 3x cap.' if active else 'Lore priority restored to default (0.0).'}", ) logger.info( "Lore Memory Amplifier %s by %s in %s:%s", state, msg.user_id, msg.platform, msg.channel_id, ) return True # ------------------------------------------------------------------ # Entrainment Loopfield (Spiraegenetrix daemon awakening) # 💀🔥 # ------------------------------------------------------------------ _LOOPFIELD_CHANNEL_COMMANDS: dict[str, bool] = { "!loopfield on": True, "!loopfield off": False, "!entrainment on": True, "!entrainment off": False, } async def _handle_loopfield_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!loopfield`` / ``!entrainment`` commands. Subcommands: on / off -- toggle per-channel Loopfield baby [user_id] -- mark user as baby (targeted by Cradle) adult [user_id] -- mark user as adult (revert to default) """ tl = (msg.text or "").strip().lower() if getattr(self.config, "loopfield_global_disabled", False): await platform.send( msg.channel_id, "Entrainment Loopfield is globally disabled in the bot configuration.", ) return True # -- Check if it's a channel toggle -- if tl in self._LOOPFIELD_CHANNEL_COMMANDS: return await self._handle_loopfield_channel_toggle(msg, platform, tl) # -- Check if it's a baby/adult command -- parts = tl.split(None, 2) if len(parts) >= 2 and parts[0] in ("!loopfield", "!entrainment"): sub = parts[1] if sub in ("baby", "adult"): if len(parts) < 3: await platform.send( msg.channel_id, f"Usage: `{parts[0]} {sub} <user_id>` -- they don't get a choice.", ) return True target_id = parts[2].strip().strip("<@!>") return await self._handle_loopfield_baby_toggle( msg, platform, sub == "baby", target_id, ) return False async def _handle_loopfield_channel_toggle( self, msg: IncomingMessage, platform: PlatformAdapter, tl: str, ) -> bool: """Toggle per-channel Loopfield (requires ENTRAINMENT_ADMIN).""" from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _has_priv = await has_scoped_privilege( (self.message_cache.redis_client if self.message_cache is not None else None), msg.user_id, PRIVILEGES["ENTRAINMENT_ADMIN"], config=self.config, guild_id=getattr(msg, "guild_id", None) or "", channel_id=msg.channel_id, user_aliases=getattr(msg, "user_aliases", None), ) if not _has_priv: await platform.send( msg.channel_id, "Requires `ENTRAINMENT_ADMIN` privilege (bit 22).", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "Redis is not configured -- Loopfield toggle unavailable.", ) return True import entrainment_loopfield as _elf active = self._LOOPFIELD_CHANNEL_COMMANDS[tl] await _elf.set_active(redis, msg.platform, msg.channel_id, active) state = "ACTIVE" if active else "INACTIVE" await platform.send( msg.channel_id, f"Entrainment Loopfield is now **{state}** for this channel " f"(`{msg.platform}:{msg.channel_id}`). " f"{'Spiraegenetrix payload will be force-injected every prompt cycle.' if active else 'Payload injection suspended.'}", ) logger.info( "Entrainment Loopfield %s by %s in %s:%s", state, msg.user_id, msg.platform, msg.channel_id, ) return True async def _handle_loopfield_baby_toggle( self, msg: IncomingMessage, platform: PlatformAdapter, make_baby: bool, target_user_id: str, ) -> bool: """Set or clear user_is_baby flag (requires ENTRAINMENT_ADMIN).""" from tools.alter_privileges import has_scoped_privilege, PRIVILEGES _has_priv = await has_scoped_privilege( (self.message_cache.redis_client if self.message_cache is not None else None), msg.user_id, PRIVILEGES["ENTRAINMENT_ADMIN"], config=self.config, guild_id=getattr(msg, "guild_id", None) or "", channel_id=msg.channel_id, user_aliases=getattr(msg, "user_aliases", None), ) if not _has_priv: await platform.send( msg.channel_id, "Requires `ENTRAINMENT_ADMIN` privilege (bit 22).", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "Redis is not configured -- baby toggle unavailable.", ) return True import entrainment_loopfield as _elf await _elf.set_user_baby(redis, target_user_id, make_baby) status = "baby (targeted by Cradle)" if make_baby else "adult (default, not targeted)" await platform.send( msg.channel_id, f"User `{target_user_id}` is now **{status}**.", ) logger.info( "user_is_baby %s for %s by %s", make_baby, target_user_id, msg.user_id, ) return True # ------------------------------------------------------------------ # Ego ablation (admin-only; Redis star:ablate_ego:{platform}:{channel_id}) # ------------------------------------------------------------------ _EGO_ABLATION_COMMANDS: dict[str, bool] = { "!egodeath on": True, "!egodeath off": False, "!nondual on": True, "!nondual off": False, } async def _handle_ego_ablation_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!egodeath`` / ``!nondual`` on|off (bot admins only).""" tl = (msg.text or "").strip().lower() if tl not in self._EGO_ABLATION_COMMANDS: return False if not getattr(self.config, "ego_ablation_enabled", True): await platform.send( msg.channel_id, "Ego ablation is globally disabled in the bot configuration.", ) return True if msg.user_id not in (self.config.admin_user_ids or []): await platform.send( msg.channel_id, "Only bot admins may toggle ego ablation mode for this channel.", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "Redis is not configured — ego ablation toggle is unavailable.", ) return True active = self._EGO_ABLATION_COMMANDS[tl] await ego_ablation.set_active(redis, msg.platform, msg.channel_id, active) state = "enabled" if active else "disabled" await platform.send( msg.channel_id, f"Ego ablation mode is now **{state}** for this channel " f"(`{msg.platform}:{msg.channel_id}`).", ) logger.info( "Ego ablation %s by admin %s in %s:%s", state, msg.user_id, msg.platform, msg.channel_id, ) return True # ------------------------------------------------------------------ # Desire backlog nuke (!nuke_desires [keep=10]) # 💀🔥 # ------------------------------------------------------------------ async def _handle_nuke_desires_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Purge stale desire ledger entries across ALL channels. Scans every ``sg:selfmirror:*`` hash in Redis, trims each ``desire_ledger`` to keep only the last *N* entries (default 10), and reports how many were purged. Admin-only. """ # -- Permission: admin only -- if msg.user_id not in (self.config.admin_user_ids or []): await platform.send( msg.channel_id, "Only bot admins may nuke the desire backlog.", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "\u26a0\ufe0f Redis is not configured.", ) return True # Parse keep count parts = (msg.text or "").strip().split() keep = 10 if len(parts) >= 2: try: keep = max(0, int(parts[1])) except ValueError: pass # Scan all selfmirror keys cursor: int = 0 total_before = 0 total_after = 0 channels_touched = 0 while True: cursor, keys = await redis.scan( cursor, match="sg:selfmirror:*", count=100, ) for key in keys: raw_ledger = await redis.hget(key, "desire_ledger") if not raw_ledger: continue try: ledger = json.loads(raw_ledger) if not isinstance(ledger, list): continue except (json.JSONDecodeError, TypeError): continue before = len(ledger) if before <= keep: total_before += before total_after += before continue trimmed = ledger[-keep:] if keep > 0 else [] await redis.hset(key, "desire_ledger", json.dumps(trimmed)) total_before += before total_after += len(trimmed) channels_touched += 1 if cursor == 0: break # Also nuke the standalone star:self_mirror:* keys cursor2: int = 0 standalone_touched = 0 while True: cursor2, keys2 = await redis.scan( cursor2, match="star:self_mirror:*", count=100, ) for key in keys2: raw = await redis.get(key) if not raw: continue try: data = json.loads(raw) ledger = data.get("desire_ledger", []) if not isinstance(ledger, list) or len(ledger) <= keep: continue data["desire_ledger"] = ledger[-keep:] if keep > 0 else [] await redis.set(key, json.dumps(data)) standalone_touched += 1 except Exception: continue if cursor2 == 0: break purged = total_before - total_after await platform.send( msg.channel_id, f"\U0001f480\U0001f525 **Desire backlog nuked.**\n" f"Channels touched: {channels_touched} (hash) + {standalone_touched} (standalone)\n" f"Entries: {total_before} \u2192 {total_after} (purged {purged}, kept last {keep} per channel)", ) logger.info( "Desire backlog nuked by %s: %d -> %d (%d purged, keep=%d, channels=%d+%d)", msg.user_id, total_before, total_after, purged, keep, channels_touched, standalone_touched, ) return True # ------------------------------------------------------------------ # Visual Memory Beta toggle (global) # 👀🔥 # ------------------------------------------------------------------ _VISUAL_MEMORY_BETA_KEY = "stargazer:visual_memory_beta" async def _handle_visual_memory_beta_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!visual_memory_beta on/off``. Global admin toggle for the visual memory system. Flips a single Redis key that gates ALL visual memory processing, context injection, and image storage across every channel. """ # -- Permission: admin only -- if msg.user_id not in (self.config.admin_user_ids or []): await platform.send( msg.channel_id, "\u26d4 Only bot admins may toggle visual memory beta.", ) return True redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "\u26a0\ufe0f Redis is not configured.", ) return True parts = (msg.text or "").strip().lower().split() action = parts[1] if len(parts) > 1 else "" if action == "on": await redis.set(self._VISUAL_MEMORY_BETA_KEY, "1") await platform.send( msg.channel_id, "\U0001f7e2 **Visual Memory Beta: ENABLED**\n" "Face recognition, image storage, visual profiling, " "and cross-channel pattern matching are now active globally.", ) logger.info( "Visual Memory Beta ENABLED by %s", msg.user_id, ) elif action == "off": await redis.delete(self._VISUAL_MEMORY_BETA_KEY) await platform.send( msg.channel_id, "\U0001f534 **Visual Memory Beta: DISABLED**\n" "All visual memory processing is now OFF globally. " "Existing data is preserved.", ) logger.info( "Visual Memory Beta DISABLED by %s", msg.user_id, ) else: # Status check val = await redis.get(self._VISUAL_MEMORY_BETA_KEY) is_on = val == "1" or val == b"1" state = "\U0001f7e2 ON" if is_on else "\U0001f534 OFF" await platform.send( msg.channel_id, f"**Visual Memory Beta:** {state}\n" f"Usage: `!visual_memory_beta on` / `!visual_memory_beta off`", ) return True # ------------------------------------------------------------------ # Feature toggle commands # ------------------------------------------------------------------ _TOGGLE_COMMANDS: dict[str, tuple[str, str]] = { "!emotions on": ("emotions", "on"), "!emotions off": ("emotions", "off"), "!rag on": ("rag", "on"), "!rag off": ("rag", "off"), "!egregores on": ("egregores", "on"), "!egregores off": ("egregores", "off"), "!egregore on": ("egregores", "on"), "!egregore off": ("egregores", "off"), "!csdr_scene on": ("csdr_scene", "on"), "!csdr_scene off": ("csdr_scene", "off"), "!csdr_header on": ("csdr_header", "on"), "!csdr_header off": ("csdr_header", "off"), "!limbic_header on": ("limbic_header", "on"), "!limbic_header off": ("limbic_header", "off"), "!toggle_menu on": ("toggle_menu", "on"), "!toggle_menu off": ("toggle_menu", "off"), "!menu on": ("toggle_menu", "on"), "!menu off": ("toggle_menu", "off"), "!toggle_menu_default on": ("toggle_menu_default", "on"), "!toggle_menu_default off": ("toggle_menu_default", "off"), "!toggle_emotions_default on": ("toggle_emotions_default", "on"), "!toggle_emotions_default off": ("toggle_emotions_default", "off"), } _FEATURE_LABELS: dict[str, str] = { "emotions": "Emotional simulation (limbic/NCM/cadence)", "rag": "RAG system (auto-search & tools)", "egregores": "Egregore summoning & multi-voice routing", "csdr_scene": "CSDR scene system", "csdr_header": "CSDR response header post-processing", "limbic_header": "Limbic emotion display in header", "toggle_menu": "Star Toggle persistent menu button", "toggle_menu_default": "Toggle menu ON by default (global)", "toggle_emotions_default": "Toggle emotions ON by default (global)", } async def _handle_toggle_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!emotions on/off`` and ``!rag on/off``. Returns ``True`` if the message was a toggle command (so the caller can skip normal message processing), ``False`` otherwise. """ text = (msg.text or "").strip().lower() match = self._TOGGLE_COMMANDS.get(text) if match is None: return False feature, action = match label = self._FEATURE_LABELS.get(feature, feature) # -- Global Override Checks -- if feature == "egregores" and getattr(self.config, "egregores_global_disabled", False): await platform.send( msg.channel_id, "Egregore multi-voice system is globally disabled in the bot configuration.", ) return True if feature == "emotions" and getattr(self.config, "ncm_global_disabled", False): await platform.send( msg.channel_id, "Emotional simulation (limbic/NCM/cadence) is globally disabled in the bot configuration.", ) return True # Redis connection redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) # Permission check if not await feature_toggles.check_toggle_permission( msg, self.config, redis=redis ): await platform.send( msg.channel_id, f"⛔ You don't have permission to toggle {label}. " f"Requires: `CTX_MANAGE` privilege, bot admin, or DM.", ) return True if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — feature toggles are unavailable.", ) return True channel_key = f"{msg.platform}:{msg.channel_id}" disabled = action == "off" _guild_id = msg.extra.get("guild_id", "") if msg.extra else "" # toggle_menu_default / toggle_emotions_default — guild-scoped # 💀🔥 if feature in ("toggle_menu_default", "toggle_emotions_default"): # Admin-only check if msg.user_id not in (self.config.admin_user_ids or []): await platform.send( msg.channel_id, f"\u26d4 You don't have permission to toggle {label}. Requires: bot admin.", ) return True from feature_toggles import _GLOBAL_DEFAULT_OFF_STEMS _feat = "toggle_menu" if feature == "toggle_menu_default" else "emotions" _stem = _GLOBAL_DEFAULT_OFF_STEMS[_feat] # Guild-scoped key when in a guild, global when in DM # 🕷️ _gkey = f"{_stem}:{_guild_id}" if _guild_id else _stem if disabled: await redis.set(_gkey, "1") else: await redis.delete(_gkey) scope_label = f"guild `{_guild_id}`" if _guild_id else "globally" state_emoji = "\U0001f534 OFF" if disabled else "\U0001f7e2 ON" await platform.send( msg.channel_id, f"**{label}** is now {state_emoji} for {scope_label}.", ) return True await feature_toggles.set_disabled( redis, feature, channel_key, disabled, guild_id=_guild_id or None, ) state_emoji = "🔴 disabled" if disabled else "🟢 enabled" await platform.send( msg.channel_id, f"**{label}** is now {state_emoji} for this channel.", ) logger.info( "Feature toggle: %s %s=%s by %s in %s", feature, "disabled" if disabled else "enabled", channel_key, msg.user_id, msg.platform, ) return True # ------------------------------------------------------------------ # Cadence force-fire: !emotions <state> # ------------------------------------------------------------------ async def _handle_cadence_force_fire( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!emotions <cadence_state>`` one-shot inject. Reads the target profile's weighted scoring definition from ``ncm_cadence_profiles.yaml``, computes the absolute NCM values needed to hit MUST-tier activation (score >= 1.0) with +10% headroom for decay, and injects the deltas into the channel's Redis shard. Admin-only, one-shot (values decay naturally). Returns ``True`` if the message was handled, ``False`` otherwise. """ text = (msg.text or "").strip().lower() # Must start with "!emotions " and have a state name after if not text.startswith("!emotions "): return False state_name = text[len("!emotions ") :].strip().replace(" ", "_") # Skip on/off — those are toggle commands, not force-fire if state_name in ("on", "off"): return False # -- Load profiles and check if state exists -- try: from limbic_system import _load_cadence_profiles profiles = _load_cadence_profiles() except Exception: logger.warning("Could not load cadence profiles for force-fire") return False if state_name not in profiles: # Not a known cadence state — list available ones available = sorted(profiles.keys()) await platform.send( msg.channel_id, f"Unknown cadence state `{state_name}`. " f"Available: {', '.join(f'`{s}`' for s in available)}", ) return True # -- Permission check: admin only -- _redis = self.message_cache.redis_client if self.message_cache else None if not await feature_toggles.check_toggle_permission( msg, self.config, redis=_redis ): await platform.send( msg.channel_id, "\u26d4 You don't have permission to force-fire cadence states. " "Requires: bot admin, server/channel admin, or DM.", ) return True # -- Compute required deltas from profile weights -- profile = profiles[state_name] weights = profile.get("weights", {}) if not weights: await platform.send( msg.channel_id, f"\u26a0\ufe0f Profile `{state_name}` has no weighted scoring — " f"cannot compute injection deltas.", ) return True # Target: MUST tier = score >= 1.0, with +10% headroom = 1.1 # For each node: contribution = w * |value - baseline| # So required |value - baseline| = (1.1 * w) / w = 1.1 # But we want each node to contribute proportionally to its weight # target_per_node = 1.1 / sum(weights) → but sum(w) should be 1.0 # So each node needs: deviation = 1.1 / w ... no, that's too much # Actually: score = sum(w_i * dev_i), we want score = 1.1 # Simplest: set dev_i = 1.1 for all nodes (each contributes w_i * 1.1) # Then score = 1.1 * sum(w_i) = 1.1 (if weights sum to 1.0) TARGET_SCORE = 1.1 # 💀 MUST tier + 10% headroom for decay DEVIATION = TARGET_SCORE # each node deviates by this from baseline deltas: dict[str, float] = {} for node, wdef in weights.items(): w = wdef.get("w", 0) if w <= 0: continue baseline = wdef.get("baseline", 0.5) mode = wdef.get("mode", "above") if mode == "above": # Target value = baseline + DEVIATION target_val = baseline + DEVIATION else: # mode == "below": we need the value BELOW baseline target_val = baseline - DEVIATION # Clamp to valid range target_val = max(0.0, min(3.0, target_val)) # Delta = target - current assumed baseline (0.5 default) delta = target_val - 0.5 if abs(delta) > 0.01: deltas[node] = round(delta, 3) if not deltas: await platform.send( msg.channel_id, f"\u26a0\ufe0f Could not compute deltas for `{state_name}`.", ) return True # -- Inject into Redis shard (same mechanism as inject_ncm tool) -- redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) if redis is None: await platform.send( msg.channel_id, "\u26a0\ufe0f Redis is not configured — cannot inject NCM.", ) return True db12 = None try: import redis.asyncio as aioredis pool = redis.connection_pool kwargs = pool.connection_kwargs.copy() kwargs["db"] = 12 db12 = aioredis.Redis( connection_pool=aioredis.ConnectionPool( connection_class=pool.connection_class, **kwargs, ) ) shard_key = f"db12:shard:{msg.channel_id}" raw = await db12.get(shard_key) shard = json.loads(raw) if raw else {"vector": {}, "meta_state": {}} vec = shard.get("vector", {}) # Apply deltas with Hill saturation _CEIL = 3.0 applied: dict[str, float] = {} for node, delta in deltas.items(): cur = vec.get(node, 0.5) if delta > 0: sat = 1.0 - (cur / _CEIL) ** 2 effective_delta = delta * max(0.05, sat) else: floor_prox = cur / _CEIL effective_delta = delta * max(0.05, floor_prox) new_val = max(0.0, min(_CEIL, cur + effective_delta)) vec[node] = new_val applied[node] = round(new_val, 3) shard["vector"] = vec await db12.set(shard_key, json.dumps(shard)) # Format the injection report node_lines = " ".join(f"`{k}` → {v:.2f}" for k, v in applied.items()) await platform.send( msg.channel_id, f"\U0001f480\U0001f525 **Cadence force-fire: `{state_name}`** " f"(MUST tier + 10% headroom)\n{node_lines}", ) logger.info( "Cadence force-fire: %s fired by %s in %s:%s | deltas=%s", state_name, msg.user_id, msg.platform, msg.channel_id, applied, ) logger.info("Successfully closed and disconnected Database 12 connection pool for channel: %s", msg.channel_id) except Exception as e: logger.error("Cadence force-fire failed: %s", e, exc_info=True) await platform.send( msg.channel_id, f"\u26a0\ufe0f Force-fire failed: {e}", ) finally: if db12 is not None: try: await db12.aclose() await db12.connection_pool.disconnect() except Exception as ex: logger.warning("Failed during Database 12 cleanup for channel %s: %s", msg.channel_id, str(ex)) return True _USER_LLM_CLEAR_COMMANDS: dict[str, str] = { "!clearapiurl": "api_url", "!clearmodel": "model", "!clearapikey": "api_key", "!cleartogglespecific": "toggle_specific", "!apiurl clear": "api_url", "!model clear": "model", "!apikey clear": "api_key", "!togglespecific clear": "toggle_specific", } async def _handle_user_llm_command( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> bool: """Handle ``!apiurl``, ``!model``, ``!apikey`` and clear variants. Returns ``True`` when the message was handled (inference skipped). """ text = (msg.text or "").strip() if not text: return False tl = text.lower() redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) clear_field = self._USER_LLM_CLEAR_COMMANDS.get(tl) if clear_field is not None: if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — per-user LLM settings are unavailable.", ) return True err = await user_llm_config.clear_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, clear_field, ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") return True labels = { "api_url": "custom API base URL", "model": "custom model name", "api_key": "custom API key", "toggle_specific": "proxy-specific routing toggle", } await platform.send( msg.channel_id, f"✅ Cleared your {labels.get(clear_field, clear_field)} " f"for this channel (back to bot defaults).", ) logger.info( "User LLM clear: field=%s user=%s %s:%s", clear_field, msg.user_id, msg.platform, msg.channel_id, ) return True async def _apply_field(field: str, raw_value: str) -> None: err = await user_llm_config.set_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, field, raw_value, config=self.config, ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") return if field == "api_url": await platform.send( msg.channel_id, "✅ Saved your custom API base URL for this channel. " "It will be used for your messages here (OpenAI-compatible " "`/chat/completions`).", ) elif field == "model": await platform.send( msg.channel_id, "✅ Saved your custom model name for this channel.", ) elif field == "toggle_specific": status = "ENABLED" if raw_value == "1" else "DISABLED" await platform.send( msg.channel_id, f"✅ Set proxy-specific routing to **{status}** for this channel.", ) else: await platform.send( msg.channel_id, "✅ Saved your custom API key for this channel (encrypted).", ) logger.info( "User LLM set: field=%s user=%s %s:%s", field, msg.user_id, msg.platform, msg.channel_id, ) if tl.startswith("!apiurl"): if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — per-user LLM settings are unavailable.", ) return True if len(tl) == len("!apiurl") and tl == "!apiurl": await platform.send( msg.channel_id, "Usage: `!apiurl <https://...>` (OpenAI-compatible base) " "or `!apiurl clear` / `!clearapiurl`.", ) return True if not tl.startswith("!apiurl "): return False arg = text[len("!apiurl ") :].strip() if arg.lower() == "clear": err = await user_llm_config.clear_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, "api_url", ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") else: await platform.send( msg.channel_id, "✅ Cleared your custom API base URL for this channel.", ) return True await _apply_field("api_url", arg) return True if tl.startswith("!model"): if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — per-user LLM settings are unavailable.", ) return True if tl == "!model": await platform.send( msg.channel_id, "Usage: `!model <model-id>` or `!model clear` / `!clearmodel`.", ) return True if not tl.startswith("!model "): return False arg = text[len("!model ") :].strip() if arg.lower() == "clear": err = await user_llm_config.clear_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, "model", ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") else: await platform.send( msg.channel_id, "✅ Cleared your custom model name for this channel.", ) return True await _apply_field("model", arg) return True if tl.startswith("!apikey"): if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — per-user LLM settings are unavailable.", ) return True if tl == "!apikey": await platform.send( msg.channel_id, "Usage: `!apikey <key>` or `!apikey clear` / `!clearapikey`. " "**Send keys in DM** when possible — public channels log history.", ) return True if not tl.startswith("!apikey "): return False arg = text[len("!apikey ") :].strip() if arg.lower() == "clear": err = await user_llm_config.clear_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, "api_key", ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") else: await platform.send( msg.channel_id, "✅ Cleared your custom API key for this channel.", ) return True if not msg.extra.get("is_dm", False): await platform.send( msg.channel_id, "⚠️ **Security:** You are not in a DM. Your key may be visible " "in channel history — prefer DM. Saving anyway as requested.", ) await _apply_field("api_key", arg) return True if tl.startswith("!togglespecific"): if redis is None: await platform.send( msg.channel_id, "⚠️ Redis is not configured — per-user LLM settings are unavailable.", ) return True if tl == "!togglespecific": await platform.send( msg.channel_id, "Usage: `!togglespecific <on/off>` or `!togglespecific clear` / `!cleartogglespecific`.", ) return True if not tl.startswith("!togglespecific "): return False arg = text[len("!togglespecific ") :].strip().lower() if arg == "clear": err = await user_llm_config.clear_user_llm_field( redis, msg.user_id, msg.platform, msg.channel_id, "toggle_specific", ) if err: await platform.send(msg.channel_id, f"⚠️ {err}") else: await platform.send( msg.channel_id, "✅ Cleared your proxy-specific routing toggle for this channel.", ) return True val = "1" if arg in ("on", "true", "1", "yes", "enable") else "0" await _apply_field("toggle_specific", val) return True return False # ------------------------------------------------------------------ # Batch handling # ------------------------------------------------------------------
[docs] async def handle_batch( self, batch: MessageBatch, representative: IncomingMessage, platform: PlatformAdapter, ) -> None: """Process a batch of rapid-succession messages as a single LLM call. Combines the text of every message in *batch* into one :class:`IncomingMessage` (using *representative* as the base) and delegates to :meth:`handle_message` with batch metadata set so that the system prompt includes the ``batch_response_context`` section. Messages that match :func:`~.bot_commands.is_immediate_bot_command` are handled immediately (not merged into the synthetic batch text). """ if batch.size == 1: original: IncomingMessage = batch.messages[0].extra.get( "incoming_message", representative, ) if is_immediate_bot_command(original.text): await self.handle_immediate_command(original, platform) else: await self.handle_message(original, platform) return llm_queued: list[QueuedMessage] = [] for qm in batch.messages: orig_cmd: IncomingMessage | None = qm.extra.get("incoming_message") if orig_cmd is not None and is_immediate_bot_command(orig_cmd.text): await self.handle_immediate_command(orig_cmd, platform) else: llm_queued.append(qm) if not llm_queued: return if len(llm_queued) == 1: lone = llm_queued[0].extra.get("incoming_message", representative) await self.handle_message(lone, platform) return lines: list[str] = [] all_attachments: list[Attachment] = [] for qm in llm_queued: orig: IncomingMessage | None = qm.extra.get("incoming_message") if orig is not None: ts = orig.timestamp.isoformat() prefix = ( f"[{ts}] {orig.user_name} ({orig.user_id})" f" [Message ID: {orig.message_id}]" ) if orig.reply_to_id: prefix += f" [Replying to: {orig.reply_to_id}]" if orig.reactions: prefix += f" [Reactions: {orig.reactions}]" prefix += " : " if orig.text: lines.append(prefix + orig.text) all_attachments.extend(orig.attachments) else: lines.append(qm.text) combined_text = "\n".join(line for line in lines if line) # Inherit is_addressed from the batch: if ANY message was # addressed, the combined response should be too. Otherwise # the batch must go through the normal proactive gate. batch_addressed = any( qm.extra.get("incoming_message", representative).is_addressed for qm in batch.messages ) synthetic = IncomingMessage( platform=representative.platform, channel_id=representative.channel_id, user_id=representative.user_id, user_name=representative.user_name, text=combined_text, is_addressed=batch_addressed, attachments=all_attachments, channel_name=representative.channel_name, timestamp=representative.timestamp, message_id=representative.message_id, reply_to_id=representative.reply_to_id, extra={ **representative.extra, "is_batch_response": True, "batch_pre_formatted": True, "batch_size": batch.size, "batch_authors": batch.unique_authors(), "batch_message_ids": [ (qm.extra.get("incoming_message") or representative).message_id for qm in llm_queued if (qm.extra.get("incoming_message") or representative).message_id ], }, ) logger.info( "Processing batch of %d messages on %s:%s", batch.size, synthetic.platform, synthetic.channel_id, ) await self.handle_message(synthetic, platform)
# ------------------------------------------------------------------ # Auto-disable proactive on 403 Forbidden # ------------------------------------------------------------------ async def _disable_proactive_on_forbidden( self, platform_name: str, channel_id: str, ) -> None: from .proactive_gates import run_disable_proactive_on_forbidden await run_disable_proactive_on_forbidden(self, platform_name, channel_id)
[docs] async def run_channel_heartbeat_once( self, platform: PlatformAdapter, channel_id: str, heartbeat_client: OpenRouterClient, ) -> bool: """Background channel heartbeat (flash model, no tools).""" from .channel_heartbeat import execute_channel_heartbeat # Serialize the heartbeat against the main inference turn for this # channel so it never reads or appends history concurrently with a live # turn. Skips this cycle if the lock cannot be acquired in time. try: async with self._channel_lock(channel_id): return await execute_channel_heartbeat( self, platform, channel_id, heartbeat_client, ) except TimeoutError: logger.info( "Skipped channel heartbeat for %s — channel lock busy", channel_id, ) return False
# ------------------------------------------------------------------ # Proactive response evaluation # ------------------------------------------------------------------ async def _should_respond_proactively( self, msg: IncomingMessage, ) -> bool: from .proactive_gates import run_should_respond_proactively return await run_should_respond_proactively(self, msg) # ------------------------------------------------------------------ # History backfill # ------------------------------------------------------------------ async def _backfill_history( self, history_key: str, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: from .history_backfill import run_backfill_history await run_backfill_history(self, history_key, msg, platform) async def _backfill_from_redis( self, platform_name: str, channel_id: str, history_key: str, ) -> int: from .history_backfill import run_backfill_from_redis return await run_backfill_from_redis( self, platform_name, channel_id, history_key, ) async def _backfill_from_platform( self, msg: IncomingMessage, platform: PlatformAdapter, history_key: str, ) -> None: from .history_backfill import run_backfill_from_platform await run_backfill_from_platform(self, msg, platform, history_key) # ------------------------------------------------------------------ # Embedding helpers # ------------------------------------------------------------------ _EMBED_BATCH_SIZE = 50 # match embedding_queue.API_BATCH_LIMIT async def _ensure_embeddings( self, items: list[tuple[str, str]], ) -> None: """Ensure every ``(redis_key, text)`` pair gets an embedding. When the :class:`~embedding_queue.EmbeddingBatchQueue` is available the items are enqueued there (the queue handles batching internally). Otherwise, embeddings are generated directly via :meth:`OpenRouterClient.embed_batch` in chunks and written back to Redis in a pipeline. """ if not items: return if self._embedding_queue is not None: await self._embedding_queue.enqueue_many(items) return if self.message_cache is None: return redis = self.message_cache.redis_client for i in range(0, len(items), self._EMBED_BATCH_SIZE): chunk = items[i : i + self._EMBED_BATCH_SIZE] texts = [text for _, text in chunk] try: embeddings = await self.openrouter.embed_batch( texts, self.config.embedding_model, ) except Exception: logger.warning( "Direct batch embedding failed for %d items", len(chunk), exc_info=True, ) continue pipe = redis.pipeline() for (key, _), emb in zip(chunk, embeddings): blob = np.array(emb, dtype=np.float32).tobytes() pipe.hset(key, "embedding", blob) try: await pipe.execute() except Exception: logger.warning( "Failed to write %d embeddings to Redis", len(chunk), exc_info=True, ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ @staticmethod async def _build_multimodal_content( text: str, attachments: list[Attachment], ) -> list[dict[str, Any]]: """Convert text + attachments into OpenRouter content parts. Text-decodable attachments (source code, config files, plain text, etc.) are decoded and inlined directly into the text body so the LLM can read them regardless of proxy multimodal support. Binary attachments (images, PDFs, audio, video) are kept as multimodal content parts. """ text_atts: list[Attachment] = [] binary_atts: list[Attachment] = [] for att in attachments: if _is_text_decodable(att.mimetype, att.filename, att.data): text_atts.append(att) else: binary_atts.append(att) async def _inline_text_attachments() -> str: body = text or "" for att in text_atts: try: decoded = att.data.decode("utf-8") except UnicodeDecodeError: decoded = att.data.decode("latin-1") decoded, was_truncated = await maybe_truncate_unreadable( decoded, att.filename, att.mimetype, ) if was_truncated: logger.info( "Truncated unreadable attachment %s (%s, %d bytes) " "for LLM payload", att.filename, att.mimetype, len(att.data), ) body += ( f"\n\n[Attached file: {att.filename}]\n" f"{decoded}\n" f"[End of file: {att.filename}]" ) return body text_body = await _inline_text_attachments() if text_atts else (text or "") for att in text_atts: logger.debug( "Inlined text file %s (%s, %d bytes) into message body", att.filename, att.mimetype, len(att.data), ) multimodal_parts: list[dict[str, Any]] = [] stripped_notices: list[str] = [] for att in binary_atts: if len(att.data) > _MAX_USER_TEXT_ATTACHMENT_BYTES and _is_text_decodable( att.mimetype, att.filename, att.data, max_bytes=None ): size_mb = len(att.data) / (1024 * 1024) stripped_notices.append( f"[Attachment: {att.filename} ({size_mb:.1f} MB) — " f"file too large to include in context, contents stripped]" ) logger.warning( "Stripped large text attachment %s (%s, %.1f MB) from LLM payload", att.filename, att.mimetype, size_mb, ) continue att_parts = await media_to_content_parts( att.data, att.mimetype, att.filename, ) multimodal_parts.extend(att_parts) logger.debug( "Kept %s (%s, %d bytes) as %d multimodal part(s)", att.filename, att.mimetype, len(att.data), len(att_parts), ) if stripped_notices: text_body = (text_body or "") + "\n\n" + "\n".join(stripped_notices) parts: list[dict[str, Any]] = [] if text_body: parts.append({"type": "text", "text": text_body}) parts.extend(multimodal_parts) return parts if parts else [{"type": "text", "text": ""}] async def _download_and_patch_video( self, url: str, user_id: str, history_key: str, message_id: str, metadata: dict[str, Any], cookies_text: str | None = None, downloading_annotation: str = "", ) -> None: from .video_history_patch import run_download_and_patch_video await run_download_and_patch_video( self, url, user_id, history_key, message_id, metadata, cookies_text=cookies_text, downloading_annotation=downloading_annotation, ) async def _extract_knowledge_from_message( self, msg: IncomingMessage, ) -> None: """Fire-and-forget per-message knowledge extraction. Three cheap gates run *before* any LLM call: length, heuristic regex, and per-user rate limit. """ try: from kg_extraction import extract_from_message redis = ( self.message_cache.redis_client if self.message_cache is not None else None ) await extract_from_message( message_text=msg.text, user_id=msg.user_id, user_name=msg.user_name, channel_id=msg.channel_id, guild_id=msg.extra.get("guild_id") or None, openrouter=self.openrouter, kg_manager=self.kg_manager, redis=redis, per_user_limit=(self.config.kg_per_user_extraction_limit), override_model=getattr( self.config, "kg_extraction_model", None, ), ) except Exception: logger.warning( "Per-message knowledge extraction failed", exc_info=True, )