Source code for message_processor.processor
"""Central message processor shared across all platforms.
Receives :class:`~platforms.base.IncomingMessage` instances from any
:class:`~platforms.base.PlatformAdapter`, manages conversation history,
calls the LLM via :class:`~openrouter_client.OpenRouterClient`, and
sends the reply back through the originating platform.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
import time
import re
import traceback
import httpx
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Callable, Coroutine, Optional, Type
from types import TracebackType
from observability import observability
import numpy as np
from config import Config
from conversation import ConversationManager
from message_queue import MessageBatch, QueuedMessage
from observability import publish_message_observability_event, publish_debug_event
from openrouter_client import OpenRouterClient
from platforms.base import Attachment, IncomingMessage, PlatformAdapter
from platforms.media_common import media_to_content_parts
from prompt_context import PromptContextBuilder
import ego_ablation
import feature_toggles
import user_llm_config
from patience_engine import ShadowBanManager, ShadowEffect, _simulate_retry_exhaust
from url_content_extractor import extract_all_url_content
from .bot_commands import is_immediate_bot_command
from .memory_linked_context import MemoryLinkedContextPayload
from .population_gate import _POPULATION_EXEMPT_GUILDS, _POPULATION_LIMIT
from .text_attachments import _is_text_decodable
from .unreadable_text import maybe_truncate_unreadable
from .user_message_format import format_user_content
# User-uploaded text attachments larger than this are stripped from the LLM
# payload and replaced with a single-line notice. Memory-linked files, tool
# results, and assistant history use separate call paths and are unaffected.
_MAX_USER_TEXT_ATTACHMENT_BYTES = 2 * 1024 * 1024 # 2 MB
if TYPE_CHECKING:
from classifiers.vector_classifier import VectorClassifier
from embedding_queue import EmbeddingBatchQueue
from knowledge_graph import KnowledgeGraphManager
from message_cache import MessageCache
from rag_system.auto_search import RAGAutoSearchManager
from status_manager import StatusManager
from task_manager import TaskManager
from threadweave import ThreadweaveManager
from web_search_context import WebSearchContextManager
logger = logging.getLogger(__name__)
def _fire_and_forget(coro, *, name: str = "") -> asyncio.Task:
"""Schedule a coroutine as a fire-and-forget task with exception logging."""
async def _wrapper():
try:
await coro
except asyncio.CancelledError:
pass
except Exception:
logger.warning("Fire-and-forget task %r failed", name, exc_info=True)
return asyncio.create_task(_wrapper(), name=name or None)
class ChannelMutex:
"""Context manager enforcing chronological processing via Redis distributed locks."""
def __init__(
self,
redis_client: Any,
channel_id: str,
lease_seconds: int = 30,
*,
raise_on_timeout: bool = True,
wait_timeout_seconds: float = 10.0,
) -> None:
self.redis = redis_client
self.lock_key = f"lock:channel:processing:{channel_id}"
self.lease = lease_seconds
current_task = asyncio.current_task()
task_name = current_task.get_name() if current_task else "main"
self.lock_value = f"worker:{task_name}"
self.acquired = False
# When False, a failure to acquire within ``wait_timeout_seconds`` is
# best-effort: log + proceed without the lock instead of raising. Used
# by background updates (edits/deletes/reactions/heartbeat) that must
# still apply even under contention, while normally serializing against
# the main inference turn for the same channel.
self.raise_on_timeout = raise_on_timeout
self.wait_timeout_seconds = wait_timeout_seconds
async def __aenter__(self) -> "ChannelMutex":
timeout = self.wait_timeout_seconds # Max block wait time before requeueing
elapsed = 0.0
interval = 0.2
while elapsed < timeout:
with observability.timer("redis_lock_acquire_latency", subsystem="message_processor"):
res = await self.redis.set(
self.lock_key, self.lock_value, ex=self.lease, nx=True
)
if res:
self.acquired = True
observability.increment(
"channel_lock_acquired", {"channel": self.lock_key}
)
return self
await asyncio.sleep(interval)
elapsed += interval
observability.increment(
"message_requeued_lock_collision", {"channel": self.lock_key}
)
if not self.raise_on_timeout:
logger.warning(
"Proceeding without channel lock %s after %.1fs (best-effort)",
self.lock_key,
timeout,
)
return self
raise TimeoutError(
f"Failed to acquire channel lock for key: {self.lock_key} within {timeout}s."
)
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc_val: Optional[BaseException],
exc_tb: Optional[TracebackType],
) -> None:
if self.acquired:
lua_release = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
try:
await self.redis.eval(lua_release, 1, self.lock_key, self.lock_value)
observability.increment(
"channel_lock_released", {"channel": self.lock_key}
)
logger.debug("Released lock: %s", self.lock_key)
except Exception as e:
logger.warning("Error releasing lock %s: %s", self.lock_key, str(e))
class ClientRegistry:
"""Manages persistent connection pools for outbound platform/REST dispatches."""
def __init__(self) -> None:
self.http_client: Optional[httpx.AsyncClient] = None
async def initialize(self) -> None:
limits = httpx.Limits(max_keepalive_connections=50, max_connections=200)
self.http_client = httpx.AsyncClient(
limits=limits, timeout=httpx.Timeout(30.0, connect=5.0)
)
async def shutdown(self) -> None:
if self.http_client:
await self.http_client.aclose()
self.http_client = None
def redact_sensitive_data(text: str) -> str:
"""Regular expression filter to strip active tokens and private details from traces."""
filters = [
(r"xox[p|b|a|y]-[0-9a-zA-Z\-]+", "[REDACTED-DISCORD-TOKEN]"),
(r"ghp_[0-9a-zA-Z]{36}", "[REDACTED-GITHUB-TOKEN]"),
(r"sk-or-v1-[0-9a-f]{64}", "[REDACTED-OPENROUTER-KEY]"),
(r"(?i)password\s*=\s*['\"][^'\"]+['\"]", "password=[REDACTED]"),
(
r"(?i)wallet_master_key\s*=\s*['\"][^'\"]+['\"]",
"wallet_master_key=[REDACTED]",
),
]
redacted = text
for pattern, replacement in filters:
redacted = re.sub(pattern, replacement, redacted)
return redacted
async def safe_command_dispatcher(
ctx: Any,
command_func: Callable[..., Coroutine[Any, Any, str]],
*args: Any,
**kwargs: Any,
) -> str:
"""Dispatches a command coroutine safely, logging exceptions and redacting tracebacks."""
correlation_id = getattr(ctx, "correlation_id", "unknown")
try:
with observability.timer("command_execution_time", subsystem="message_processor"):
return await command_func(ctx, *args, **kwargs)
except Exception as e:
raw_traceback = traceback.format_exc()
redacted_traceback = redact_sensitive_data(raw_traceback)
logger.error(
"Exception occurred in command dispatcher [ID: %s]: %s\n%s",
correlation_id,
str(e),
redacted_traceback,
)
observability.increment("command_error", {"exception": e.__class__.__name__})
observability.alert(
"ERROR",
f"Command dispatch crashed: {str(e)}",
metadata={"trace": redacted_traceback, "correlation_id": correlation_id},
)
return "⚠️ An internal error occurred while processing your request. The issue has been securely logged."
[docs]
class MessageProcessor:
"""Platform-agnostic message handler.
Parameters
----------
config:
Global bot configuration (forwarded to the context builder).
conversation_manager:
Manages per-channel conversation histories.
openrouter:
The OpenRouter API client for LLM completions.
message_cache:
Optional Redis-backed message cache. When provided, every
message the bot sees is logged with an embedding vector.
"""
[docs]
def __init__(
self,
config: Config,
conversation_manager: ConversationManager,
openrouter: OpenRouterClient,
message_cache: MessageCache | None = None,
kg_manager: KnowledgeGraphManager | None = None,
auto_search: RAGAutoSearchManager | None = None,
task_manager: TaskManager | None = None,
classifier: VectorClassifier | None = None,
threadweave: ThreadweaveManager | None = None,
embedding_queue: EmbeddingBatchQueue | None = None,
status_manager: StatusManager | None = None,
web_search: WebSearchContextManager | None = None,
persona_pref_manager: Any | None = None,
shadow_ban_manager: ShadowBanManager | None = None,
sword_graph_manager: Any | None = None,
visual_memory: Any | None = None,
) -> None:
"""Initialize the instance.
Args:
config (Config): Bot configuration object.
conversation_manager (ConversationManager): The conversation manager value.
openrouter (OpenRouterClient): The openrouter value.
message_cache (MessageCache | None): The message cache value.
kg_manager (KnowledgeGraphManager | None): The kg manager value.
auto_search (RAGAutoSearchManager | None): The auto search value.
task_manager (TaskManager | None): The task manager value.
classifier (VectorClassifier | None): The classifier value.
threadweave (ThreadweaveManager | None): The threadweave value.
embedding_queue (EmbeddingBatchQueue | None): The embedding queue value.
status_manager (StatusManager | None): The status manager value.
web_search (WebSearchContextManager | None): The web search value.
persona_pref_manager (Any | None): PersonaPreferenceManager instance.
shadow_ban_manager (ShadowBanManager | None): Shadow ban manager instance.
sword_graph_manager (Any | None): SWORD graph manager instance.
visual_memory (Any | None): VisualMemoryEngine for cross-channel
image pattern recognition.
"""
self.config = config
self.conversation = conversation_manager
self.openrouter = openrouter
self.message_cache = message_cache
self.kg_manager = kg_manager
self._auto_search = auto_search
self._task_manager = task_manager
self._classifier = classifier
self._threadweave = threadweave
self._embedding_queue = embedding_queue
self._status_manager = status_manager
self._web_search = web_search
self.persona_pref_manager = persona_pref_manager
self._shadow_ban = shadow_ban_manager
self._visual_memory = visual_memory # 👀 cross-channel visual recognition
self.client_registry = ClientRegistry()
self._backfilled_channels: set[str] = set()
self._ctx_builder = PromptContextBuilder(
config,
kg_manager=kg_manager,
threadweave_manager=threadweave,
status_manager=status_manager,
message_cache=message_cache,
task_manager=task_manager,
conversation_manager=conversation_manager,
openrouter_client=self.openrouter,
persona_pref_manager=persona_pref_manager,
sword_graph_manager=sword_graph_manager,
)
# 💀 Cadence Post-Processor — code-level text degradation
self._cadence_processor = None
try:
from cadence_refiner import CadencePostProcessor
self._cadence_processor = CadencePostProcessor()
except ImportError:
logger.debug("cadence_refiner not available")
from message_processor.command_router import CommandRouter
self.command_router = CommandRouter(self)
# Wire WorkerCancellationDaemon to initialization if message_cache/redis is present
self._cancellation_daemon_task = None
if self.message_cache is not None:
try:
loop = asyncio.get_running_loop()
self._cancellation_daemon_task = loop.create_task(
self._run_cancellation_daemon(),
name="sg-worker-cancellation-daemon",
)
except RuntimeError:
# No running event loop (e.g. sync scripts, unit tests)
pass
# ------------------------------------------------------------------
# Distributed Response Cancellation (Worker Daemon)
# ------------------------------------------------------------------
async def _run_cancellation_daemon(self) -> None:
"""Background daemon subscribing to 'sg:channel:cancel' to abort local turns."""
if self.message_cache is None:
return
redis = self.message_cache.redis_client
pubsub = redis.pubsub()
try:
await pubsub.subscribe("sg:channel:cancel")
logger.info("WorkerCancellationDaemon successfully subscribed to 'sg:channel:cancel'")
async for message in pubsub.listen():
if message["type"] != "message":
continue
try:
data_str = (
message["data"]
if isinstance(message["data"], str)
else message["data"].decode()
)
data = json.loads(data_str)
platform = data.get("platform")
channel_id = data.get("channel_id")
channel_key = data.get("channel_key", f"{platform}:{channel_id}")
user_id = data.get("user_id")
logger.info(
"WorkerCancellationDaemon: Received cancel signal for channel %s initiated by %s",
channel_key,
user_id,
)
cancelled_any = False
for task in asyncio.all_tasks():
if task.get_name() == f"inference:{channel_key}":
logger.info(
"WorkerCancellationDaemon: Tagging task %s as user-cancelled and executing task.cancel()",
task.get_name()
)
setattr(task, "_user_cancelled", True)
task.cancel()
cancelled_any = True
if not cancelled_any:
logger.debug(
"WorkerCancellationDaemon: No active task named 'inference:%s' found on this worker.",
channel_key
)
except Exception:
logger.exception("WorkerCancellationDaemon failed to process cancellation message")
except asyncio.CancelledError:
logger.info("WorkerCancellationDaemon cancelled")
except Exception:
logger.exception("WorkerCancellationDaemon listener crashed")
finally:
try:
await pubsub.unsubscribe("sg:channel:cancel")
await pubsub.aclose()
except Exception:
pass
[docs]
async def shutdown(self) -> None:
"""Shutdown background processes and cancellation daemons."""
if hasattr(self, "_cancellation_daemon_task") and self._cancellation_daemon_task and not self._cancellation_daemon_task.done():
logger.info("Stopping WorkerCancellationDaemon...")
self._cancellation_daemon_task.cancel()
try:
await self._cancellation_daemon_task
except asyncio.CancelledError:
pass
logger.info("WorkerCancellationDaemon stopped.")
async def _cleanup_egregore_webhooks(
self,
platform: PlatformAdapter,
discord_cleanups: list[tuple[str, str]],
) -> None:
"""Delete egregore Discord webhooks, working from any process.
On the gateway the adapter exposes the live ``client`` and we delete
directly; on an inference worker the adapter is a
:class:`core.proxy_adapter.ProxyPlatformAdapter` (no live client), so we
delegate the deletion to the gateway over RPC
(``delete_egregore_webhook``). Pre-T3 this used ``getattr(platform,
"client", None)`` unconditionally and silently no-opped on workers,
orphaning the webhooks.
"""
if not discord_cleanups:
return
_client = getattr(platform, "client", None) or getattr(platform, "_client", None)
delegate = getattr(platform, "delegate_to_gateway", None)
for gid, whid in discord_cleanups:
try:
if _client is not None:
from tools._egregore_discord import delete_egregore_webhook_by_id
await delete_egregore_webhook_by_id(_client, gid, whid)
elif delegate is not None:
await delegate(
"delete_egregore_webhook", guild_id=gid, webhook_id=whid
)
else:
logger.debug(
"No client or gateway delegate to delete webhook %s/%s",
gid, whid,
)
except Exception:
logger.debug(
"egregore webhook cleanup failed gid=%s wid=%s",
gid, whid, exc_info=True,
)
# ------------------------------------------------------------------
# Pending-response tracking (Redis)
# ------------------------------------------------------------------
_PENDING_PREFIX = "pending_response:"
_PENDING_TTL = 3600 # 1 hour safety net
async def _mark_pending(self, msg: IncomingMessage) -> None:
"""Write a Redis marker for an in-flight response.
The marker is removed by :meth:`_clear_pending` once the reply
has been sent. If the bot crashes in between, the marker
survives and is picked up by :meth:`recover_pending_responses`
on the next startup.
"""
if self.message_cache is None:
return
key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}"
mapping = {
"platform": msg.platform,
"channel_id": msg.channel_id,
"user_id": msg.user_id,
"user_name": msg.user_name,
"text": msg.text or "",
"message_id": msg.message_id or "",
"reply_to_id": msg.reply_to_id or "",
"channel_name": msg.channel_name or "",
"is_addressed": "1" if msg.is_addressed else "0",
"timestamp": msg.timestamp.isoformat(),
"started_at": datetime.now(timezone.utc).isoformat(),
}
try:
pipe = self.message_cache.redis_client.pipeline()
pipe.hset(key, mapping=mapping)
pipe.expire(key, self._PENDING_TTL)
await pipe.execute()
except Exception:
logger.warning("Failed to mark pending response", exc_info=True)
async def _clear_pending(self, msg: IncomingMessage) -> None:
"""Remove the in-flight marker after a response completes."""
if self.message_cache is None:
return
key = f"{self._PENDING_PREFIX}{msg.platform}:{msg.channel_id}"
try:
await self.message_cache.redis_client.delete(key)
except Exception:
logger.debug("Failed to clear pending response", exc_info=True)
[docs]
async def recover_pending_responses(
self,
enqueue_callback: Callable[[IncomingMessage, PlatformAdapter], Awaitable[None]],
adapters: list[PlatformAdapter],
) -> int:
"""Re-enqueue messages whose responses were interrupted by a restart.
Called once during startup after all platform adapters are
running. Returns the number of recovered messages.
"""
if self.message_cache is None:
return 0
adapter_map: dict[str, PlatformAdapter] = {a.name: a for a in adapters}
redis = self.message_cache.redis_client
recovered = 0
try:
cursor: int | bytes = 0
keys: list[bytes | str] = []
while True:
cursor, batch = await redis.scan(
cursor,
match=f"{self._PENDING_PREFIX}*",
count=100,
)
keys.extend(batch)
if not cursor:
break
except Exception:
logger.warning(
"Failed to scan for pending responses",
exc_info=True,
)
return 0
for raw_key in keys:
key = raw_key.decode() if isinstance(raw_key, bytes) else raw_key
try:
data = await redis.hgetall(key)
if not data:
await redis.delete(key)
continue
def _s(field: str) -> str:
v = data.get(field, b"")
return v.decode() if isinstance(v, bytes) else (v or "")
platform_name = _s("platform")
adapter = adapter_map.get(platform_name)
if adapter is None:
logger.warning(
"No adapter for pending response platform %r — deleting key %s",
platform_name,
key,
)
await redis.delete(key)
continue
ts_raw = _s("timestamp")
try:
ts = datetime.fromisoformat(ts_raw)
except (ValueError, TypeError):
ts = datetime.now(timezone.utc)
msg = IncomingMessage(
platform=platform_name,
channel_id=_s("channel_id"),
user_id=_s("user_id"),
user_name=_s("user_name"),
text=_s("text"),
is_addressed=_s("is_addressed") == "1",
channel_name=_s("channel_name"),
timestamp=ts,
message_id=_s("message_id"),
reply_to_id=_s("reply_to_id"),
extra={"is_recovery": True},
)
await redis.delete(key)
_fire_and_forget(
publish_debug_event(
"pending_recovery",
"message_processor",
platform=platform_name,
channel_id=msg.channel_id,
user_id=msg.user_id,
preview=msg.text[:500] if msg.text else "",
payload={
"original_ts": ts_raw,
"recovered_at": datetime.now(timezone.utc).isoformat(),
},
),
name="obs_pending_recovery",
)
await enqueue_callback(msg, adapter)
recovered += 1
logger.info(
"Recovered pending response for %s:%s (user=%s)",
platform_name,
msg.channel_id,
msg.user_name,
)
except Exception:
logger.warning(
"Failed to recover pending response from %s",
key,
exc_info=True,
)
try:
await redis.delete(key)
except Exception:
pass
if recovered:
logger.info("Recovered %d pending response(s) from previous run", recovered)
return recovered
async def _is_backfilled(self, history_key: str) -> bool:
"""Check if history for history_key has been backfilled."""
if self.message_cache is not None:
try:
return bool(await self.message_cache.redis_client.sismember("sg:backfilled", history_key))
except Exception:
pass
return history_key in self._backfilled_channels
async def _record_backfilled(self, history_key: str) -> None:
"""Mark history_key as backfilled in Redis set and local set."""
self._backfilled_channels.add(history_key)
if self.message_cache is not None:
try:
redis = self.message_cache.redis_client
await redis.sadd("sg:backfilled", history_key)
await redis.expire("sg:backfilled", 86400) # 24h TTL
except Exception:
pass
async def _discard_backfilled(self, history_key: str) -> None:
"""Remove history_key from backfilled sets."""
self._backfilled_channels.discard(history_key)
if self.message_cache is not None:
try:
await self.message_cache.redis_client.srem("sg:backfilled", history_key)
except Exception:
pass
async def _begin_backfill(self, history_key: str) -> bool:
"""Acquire a short-lived in-progress marker for a channel backfill.
Returns ``True`` when this caller may proceed with the backfill, or
``False`` when another handler is already backfilling the same channel
(so we avoid a duplicate load that would double-append history or race
the platform dedup snapshot). Fail-open: returns ``True`` when Redis is
unavailable so a backfill is never blocked by the marker.
"""
if self.message_cache is None:
return True
try:
got = await self.message_cache.redis_client.set(
f"sg:backfilling:{history_key}",
"1",
ex=120,
nx=True,
)
return bool(got)
except Exception:
return True
async def _end_backfill(self, history_key: str) -> None:
"""Release the in-progress backfill marker."""
if self.message_cache is None:
return
try:
await self.message_cache.redis_client.delete(
f"sg:backfilling:{history_key}",
)
except Exception:
pass
def _channel_lock(self, channel_id: str, *, raise_on_timeout: bool = True):
"""Per-channel processing mutex shared with the main inference turn.
Background updates (message edits/deletes/reactions and channel
heartbeats) serialize against an in-flight turn for the same channel.
Returns a no-op context when Redis is unavailable. On lock timeout the
mutation is skipped rather than applied without the lock.
"""
import contextlib
redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
if redis is None:
return contextlib.nullcontext()
return ChannelMutex(
redis,
channel_id,
raise_on_timeout=raise_on_timeout,
wait_timeout_seconds=30.0,
)
async def _resolve_identity(self, msg: IncomingMessage) -> None:
"""Resolve ingress identity and populate unified ID / aliases on message."""
if msg.extra.get("_identity_resolved"):
return
redis_client = self.message_cache.redis_client if self.message_cache else None
if redis_client is not None:
try:
from services.identity_registry import IdentityRegistry
uuid, aliases = await IdentityRegistry.resolve_ingress_identity(
msg.platform, msg.user_id, redis_client
)
msg.unified_user_id = uuid
msg.user_aliases = aliases
except Exception as e:
logger.error("Error resolving identity for %s:%s - %s", msg.platform, msg.user_id, e)
msg.extra["_identity_resolved"] = True
def _restricted_access_blocks(self, msg: IncomingMessage) -> bool:
"""True if the population kill-switch refuses this message."""
_member_count = msg.extra.get("member_count", 0) or 0
_guild_id = str(msg.extra.get("guild_id", "") or "")
if (
_member_count > _POPULATION_LIMIT
and _guild_id not in _POPULATION_EXEMPT_GUILDS
):
logger.warning(
"🚨 RESTRICTED ACCESS PROTOCOL: environment '%s' has %d "
"members (limit: %d) — classified as hostile, refusing "
"to operate",
msg.extra.get("guild_name") or msg.channel_name,
_member_count,
_POPULATION_LIMIT,
)
return True
return False
# ------------------------------------------------------------------
# Public handler -- called by every platform adapter
# ------------------------------------------------------------------
[docs]
async def handle_immediate_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
*,
_population_checked: bool = False,
) -> None:
"""Handle ``!`` admin/toggle/user-LLM commands without embed or LLM."""
await self._resolve_identity(msg)
if not _population_checked and self._restricted_access_blocks(msg):
return
# D2-01: Enforce STARGAZER_USE here (not just in handle_message) so
# that callers like handle_batch that bypass handle_message cannot
# skip the access gate.
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_redis_use = self.message_cache.redis_client if self.message_cache else None
_guild_id_use = str(msg.extra.get("guild_id", "") or "")
_channel_id_use = str(
msg.extra.get("channel_id_raw", msg.channel_id) or "",
)
try:
_has_access_imm = _redis_use is not None and await has_scoped_privilege(
_redis_use,
msg.user_id,
PRIVILEGES["STARGAZER_USE"],
self.config,
guild_id=_guild_id_use,
channel_id=_channel_id_use,
user_aliases=msg.user_aliases,
)
except Exception:
logger.warning(
"[security] STARGAZER_USE Redis error (immediate_cmd) user %s — failing closed",
msg.user_id,
exc_info=True,
)
_has_access_imm = False
if not _has_access_imm:
logger.debug(
"[security] STARGAZER_USE denied (immediate_cmd): user=%s redis=%s",
msg.user_id,
"ok" if _redis_use else "unavailable",
)
return # Fail-closed
history_key = f"{msg.platform}:{msg.channel_id}"
from observability import generate_request_id
msg.extra.setdefault("observability_request_id", generate_request_id())
text_lower = (msg.text or "").strip().lower()
# -- Identity Aliasing: !link and !link confirm --
if text_lower == "!link" or text_lower.startswith("!link "):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "❌ Database connection not available.")
return
parts = msg.text.strip().split()
if len(parts) == 1:
await platform.send(
msg.channel_id,
"ℹ️ **Account Aliasing Usage:**\n"
"• Initiate link: `!link <target_platform> <target_user_id>` (e.g. `!link matrix @username:matrix.org`)\n"
"• Confirm link: `!link confirm <code>`"
)
return
subcmd = parts[1].lower()
if subcmd == "confirm":
if len(parts) < 3:
await platform.send(
msg.channel_id,
"❌ Please provide the 6-character link code: `!link confirm <code>`"
)
return
code = parts[2].upper()
try:
from services.identity_registry import IdentityRegistry
success, result = await IdentityRegistry.confirm_link_token(
confirming_platform=msg.platform,
confirming_user_id=msg.user_id,
link_code=code,
redis_client=_redis,
)
if success:
await platform.send(
msg.channel_id,
f"✅ **Success!** Accounts linked successfully under unified identity `{result}`."
)
msg.extra["_identity_resolved"] = False
await self._resolve_identity(msg)
else:
await platform.send(msg.channel_id, f"❌ **Error:** {result}")
except Exception as ex:
logger.exception("Error during account confirmation link")
await platform.send(msg.channel_id, f"❌ **An unexpected error occurred:** {ex}")
return
else:
if len(parts) < 3:
await platform.send(
msg.channel_id,
"❌ Invalid format. Use: `!link <target_platform> <target_user_id>` (e.g., `!link matrix @user:matrix.org`)"
)
return
target_plat = parts[1].lower()
target_uid = parts[2]
if target_plat == msg.platform and target_uid == msg.user_id:
await platform.send(
msg.channel_id,
"❌ You cannot link an account to itself."
)
return
try:
from services.identity_registry import IdentityRegistry
code = await IdentityRegistry.create_link_token(
initiator_platform=msg.platform,
initiator_user_id=msg.user_id,
target_platform=target_plat,
target_user_id=target_uid,
redis_client=_redis,
)
await platform.send(
msg.channel_id,
f"🔒 **Verification Token Generated!**\n"
f"Pairing code: **`{code}`**\n"
f"To complete the link, switch to your **{target_plat}** account and run:\n"
f"`!link confirm {code}`\n"
f"*(This token is locked to {target_plat}:{target_uid} and will expire in 5 minutes)*"
)
except Exception as ex:
logger.exception("Error during account initiation link")
await platform.send(msg.channel_id, f"❌ **Error initiating link:** {ex}")
return
# -- Scene Consent Gate: !consent # 💀🔥😈 --
# Scene-level consent model. Users approve scenes they're willing
# to enter. Safe scenes need no consent. Negotiation scenes do.
# Usage: !consent approve all | !consent <scene> | !consent deny <scene> | !consent status
if text_lower == "!consent" or text_lower.startswith("!consent "):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "\u274c Redis unavailable.")
return
parts = (msg.text or "").strip().split()
# !consent (no args) or !consent status -> show current state
if len(parts) == 1 or (len(parts) == 2 and parts[1].lower() == "status"):
try:
from chaos_switch._scenes import (
get_scene_consent, NEGOTIATION_SCENES,
SCENE_CONSENT_DESCRIPTIONS,
)
approved = await get_scene_consent(_redis, msg.user_id)
neg_approved = approved & NEGOTIATION_SCENES
neg_denied = NEGOTIATION_SCENES - approved
lines = ["\U0001f512 **Scene Consent Status**\n"]
if neg_approved:
lines.append("**Approved:**")
for s in sorted(neg_approved):
desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "")
lines.append(f"\u2705 `{s}` \u2014 {desc}")
if neg_denied:
lines.append("\n**Not approved:**")
for s in sorted(neg_denied):
desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "")
lines.append(f"\u274c `{s}` \u2014 {desc}")
if not neg_approved and not neg_denied:
lines.append("No negotiation scenes configured.")
lines.append(
f"\n**{len(neg_approved)}/{len(NEGOTIATION_SCENES)}** "
f"negotiation scenes approved.\n"
f"Grant: `!consent <scene>` | `!consent approve all`\n"
f"Revoke: `!consent deny <scene>`\n"
f"View all: `!scenes`"
)
await platform.send(msg.channel_id, "\n".join(lines))
except Exception as exc:
await platform.send(msg.channel_id, f"\u274c Consent check failed: {exc}")
return
subcmd = parts[1].lower()
# !consent approve all -> blanket approve
if subcmd == "approve" and len(parts) >= 3 and parts[2].lower() == "all":
try:
from chaos_switch._scenes import (
grant_all_scene_consent, NEGOTIATION_SCENES,
)
count = await grant_all_scene_consent(_redis, msg.user_id)
await platform.send(
msg.channel_id,
f"\u2705 **All {count} negotiation scenes approved.**\n"
f"You can enter any scene. Use `!consent deny <scene>` to revoke specific ones.",
)
except Exception as exc:
await platform.send(msg.channel_id, f"\u274c Consent grant failed: {exc}")
return
# !consent deny <scene> -> revoke specific scene
if subcmd == "deny":
if len(parts) < 3:
await platform.send(
msg.channel_id,
"\u274c Usage: `!consent deny <scene_name>`\n"
"Use `!scenes` to see available scenes.",
)
return
scene_name = "_".join(parts[2:]).lower().strip()
try:
from chaos_switch._scenes import (
deny_scene_consent, NEGOTIATION_SCENES,
SCENE_CONSENT_DESCRIPTIONS, get_scene_consent,
)
if scene_name not in NEGOTIATION_SCENES:
await platform.send(
msg.channel_id,
f"\u274c `{scene_name}` is not a negotiation scene "
f"(either safe or unknown).\nUse `!scenes` to see all scenes.",
)
return
await deny_scene_consent(_redis, msg.user_id, scene_name)
desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "")
approved = await get_scene_consent(_redis, msg.user_id)
count = len(approved & NEGOTIATION_SCENES)
await platform.send(
msg.channel_id,
f"\U0001f513 **Consent revoked** for `{scene_name}` ({desc}).\n"
f"**{count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.",
)
except Exception as exc:
await platform.send(msg.channel_id, f"\u274c Consent deny failed: {exc}")
return
# !consent <scene> -> approve specific scene
scene_name = "_".join(parts[1:]).lower().strip()
try:
from chaos_switch._scenes import (
grant_scene_consent as _grant_sc,
NEGOTIATION_SCENES, SAFE_SCENES,
SCENE_CONSENT_DESCRIPTIONS, get_scene_consent,
SCENE_FRAMES,
)
if scene_name in SAFE_SCENES:
desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "")
await platform.send(
msg.channel_id,
f"\u2139\ufe0f `{scene_name}` ({desc}) is a **safe scene** \u2014 "
f"no consent needed, you can always enter it.",
)
return
if scene_name not in NEGOTIATION_SCENES:
# Check if it's a valid scene at all
if scene_name in SCENE_FRAMES:
await platform.send(
msg.channel_id,
f"\u2139\ufe0f `{scene_name}` exists but isn't classified "
f"as a negotiation scene.",
)
else:
await platform.send(
msg.channel_id,
f"\u274c Unknown scene `{scene_name}`.\n"
f"Use `!scenes` to see all available scenes.",
)
return
await _grant_sc(_redis, msg.user_id, scene_name)
desc = SCENE_CONSENT_DESCRIPTIONS.get(scene_name, "")
approved = await get_scene_consent(_redis, msg.user_id)
count = len(approved & NEGOTIATION_SCENES)
await platform.send(
msg.channel_id,
f"\u2705 **Consent granted** for `{scene_name}` ({desc}).\n"
f"**{count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.",
)
except Exception as exc:
await platform.send(msg.channel_id, f"\u274c Consent grant failed: {exc}")
return
# -- Scene List: !scenes # 🎭💀 --
if text_lower == "!scenes":
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "\u274c Redis unavailable.")
return
try:
from chaos_switch._scenes import (
SAFE_SCENES, NEGOTIATION_SCENES,
SCENE_CONSENT_DESCRIPTIONS, get_scene_consent,
)
approved = await get_scene_consent(_redis, msg.user_id)
lines = ["\U0001f3ad **Scene Frames \u2014 Consent Status**\n"]
lines.append("**SAFE** (no consent needed):")
for s in sorted(SAFE_SCENES):
desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "")
lines.append(f"\u2705 `{s}` \u2014 {desc}")
lines.append("\n**NEGOTIATION REQUIRED:**")
for s in sorted(NEGOTIATION_SCENES):
desc = SCENE_CONSENT_DESCRIPTIONS.get(s, "")
icon = "\u2705" if s in approved else "\u274c"
lines.append(f"{icon} `{s}` \u2014 {desc}")
neg_count = len(approved & NEGOTIATION_SCENES)
lines.append(
f"\n**{neg_count}/{len(NEGOTIATION_SCENES)}** negotiation scenes approved.\n"
f"Approve: `!consent <scene>` | `!consent approve all`\n"
f"Revoke: `!consent deny <scene>`"
)
await platform.send(msg.channel_id, "\n".join(lines))
except Exception as exc:
await platform.send(msg.channel_id, f"\u274c Scene list failed: {exc}")
return
if msg.text.strip().lower() == "!clear":
# CTX_MANAGE (bit 15) — auto-granted in DMs
_is_dm = bool(msg.extra.get("is_dm"))
if not _is_dm:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_redis = self.message_cache.redis_client if self.message_cache else None
_guild_id = str(msg.extra.get("guild_id", "") or "")
if not _redis or not await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["CTX_MANAGE"],
self.config,
guild_id=_guild_id,
user_aliases=msg.user_aliases,
):
await platform.send(
msg.channel_id,
"You don't have the CTX_MANAGE privilege to clear history.",
)
return
self.conversation.clear(history_key)
await platform.send(
msg.channel_id,
"Conversation history cleared.",
)
return
if msg.text.strip().lower() == "!ctxbreak":
# CTX_MANAGE (bit 15) — auto-granted in DMs
_is_dm = bool(msg.extra.get("is_dm"))
if not _is_dm:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_redis = self.message_cache.redis_client if self.message_cache else None
_guild_id = str(msg.extra.get("guild_id", "") or "")
if not _redis or not await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["CTX_MANAGE"],
self.config,
guild_id=_guild_id,
user_aliases=msg.user_aliases,
):
await platform.send(
msg.channel_id,
"You don't have the CTX_MANAGE privilege to set a context break.",
)
return
if self.message_cache is not None:
await self.message_cache.set_ctxbreak_ts(
msg.platform,
msg.channel_id,
msg.timestamp.timestamp(),
)
self.conversation.clear(history_key)
await self._discard_backfilled(history_key)
await platform.send(
msg.channel_id,
"Context break set. Messages before this point will not be"
" included in future context.",
)
return
if msg.text.strip().lower() == "!stop":
logger.info("Worker: !stop command triggered by user_id=%s in channel_id=%s", msg.user_id, msg.channel_id)
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "❌ Database connection not available.")
return
pending_key = f"pending_response:{msg.platform}:{msg.channel_id}"
try:
initiator_bytes = await _redis.hget(pending_key, "user_id")
if not initiator_bytes:
logger.info("Worker: no pending response found for channel_id=%s", msg.channel_id)
await platform.send(msg.channel_id, "ℹ️ No active response in this channel to stop.")
return
initiator_id = initiator_bytes.decode() if isinstance(initiator_bytes, bytes) else initiator_bytes
logger.info("Worker: pending response for channel_id=%s has initiator_id=%s", msg.channel_id, initiator_id)
is_admin = bool(msg.extra.get("is_server_admin", False)) or msg.user_id in (self.config.admin_user_ids or [])
if is_admin or str(initiator_id) == str(msg.user_id):
logger.info("Worker: user_id=%s is authorized to cancel channel_id=%s (is_admin=%s)", msg.user_id, msg.channel_id, is_admin)
import json
payload = {
"platform": msg.platform,
"channel_id": msg.channel_id,
"user_id": msg.user_id,
}
await _redis.publish("sg:channel:cancel", json.dumps(payload))
logger.info("Worker: published cancellation payload to 'sg:channel:cancel' for channel_id=%s", msg.channel_id)
await platform.send(msg.channel_id, "🛑 Response stopped.")
else:
logger.warning("Worker: user_id=%s is NOT authorized to cancel channel_id=%s", msg.user_id, msg.channel_id)
await platform.send(
msg.channel_id,
"❌ You are not allowed to stop this response (only the message author or a server administrator can stop it)."
)
except Exception:
logger.exception("Worker: error during cancellation handling for channel_id=%s", msg.channel_id)
await platform.send(msg.channel_id, "❌ An unexpected error occurred while stopping the response.")
return
# Route commands through the Command Router
_cmd_handled = await self.command_router.dispatch(msg, platform)
if _cmd_handled:
return
# Proxy status / observability commands
from .proxy_status_commands import (
is_proxy_status_command,
handle_proxy_status_command,
user_priv_text,
)
if is_proxy_status_command(msg.text):
from tools.alter_privileges import has_privilege, PRIVILEGES
_redis = self.message_cache.redis_client if self.message_cache else None
_is_admin = bool(_redis) and await has_privilege(
_redis,
msg.user_id,
PRIVILEGES["ALTER_PRIVILEGES"],
self.config,
user_aliases=msg.user_aliases,
)
result = await handle_proxy_status_command(
msg.text,
self.config.llm_base_url,
is_admin=_is_admin,
)
await platform.send(msg.channel_id, result)
return
# !user_priv [optional user_id]
if msg.text.strip().lower().startswith("!user_priv"):
parts = msg.text.strip().split(maxsplit=1)
target_id = parts[1].strip() if len(parts) > 1 else None
# Strip Discord mention markup if present: <@123456> → 123456
if target_id and target_id.startswith("<@") and target_id.endswith(">"):
target_id = target_id.lstrip("<@!").rstrip(">")
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis:
result = await user_priv_text(
_redis,
msg.user_id,
target_id,
self.config,
)
else:
result = "❌ Redis unavailable."
await platform.send(msg.channel_id, result)
return
# !capsh [optional user_id]
if msg.text.strip().lower().startswith("!capsh"):
from message_processor.proxy_status_commands import capsh_text
parts = msg.text.strip().split(maxsplit=1)
target_id = parts[1].strip() if len(parts) > 1 else None
if target_id and target_id.startswith("<@") and target_id.endswith(">"):
target_id = target_id.lstrip("<@!").rstrip(">")
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis:
guild_id = msg.extra.get("guild_id", "")
channel_id = msg.extra.get("channel_id", "") or msg.channel_id
result = await capsh_text(
_redis,
msg.user_id,
target_id,
self.config,
guild_id=guild_id,
channel_id=channel_id,
)
else:
result = "❌ Redis unavailable."
await platform.send(msg.channel_id, result)
return
# ── Admin ops: !bot_restart / !proxy_restart / !bot_pull ──
from .admin_ops_commands import is_admin_ops_command, handle_admin_ops_command
if is_admin_ops_command(msg.text):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
# F-01: Log critical when Redis is absent so degraded state is visible.
logger.critical(
"[admin_ops] Redis unavailable — privilege check SKIPPED. "
"Admin-ID-only gate is in effect for user %s.",
msg.user_id,
)
_is_admin_ops_user = False
if _redis:
from tools.alter_privileges import has_privilege, PRIVILEGES
try:
# D2-08: Wrap in try/except so Redis connection errors
# produce user feedback instead of silently swallowing.
_is_admin_ops_user = await has_privilege(
_redis,
msg.user_id,
PRIVILEGES["ALTER_PRIVILEGES"],
self.config,
user_aliases=msg.user_aliases,
)
except Exception:
logger.exception(
"[admin_ops] Redis error during privilege check for %s",
msg.user_id,
)
await platform.send(
msg.channel_id,
"❌ Privilege check failed (Redis error). Try again.",
)
return
# Also allow config-level admin_user_ids
if not _is_admin_ops_user and msg.user_id in (
self.config.admin_user_ids or []
):
_is_admin_ops_user = True
if not _is_admin_ops_user:
await platform.send(
msg.channel_id,
"❌ This command requires `ALTER_PRIVILEGES`.",
)
return
# send_callback closes over platform + channel_id
async def _send(text: str, _ch=msg.channel_id) -> None:
await platform.send(_ch, text)
await handle_admin_ops_command(msg.text, self.config, send_callback=_send)
return
# ── !clearshadowmemories <user_id> — purge all shadow memories # 💀🔥 ──
text_lower = (msg.text or "").strip().lower()
if text_lower.startswith("!clearshadowmemories"):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "❌ Redis unavailable.")
return
# Admin gate: ALTER_PRIVILEGES or config admin_user_ids # 😈
_is_admin = False
from tools.alter_privileges import has_privilege, PRIVILEGES
try:
_is_admin = await has_privilege(
_redis,
msg.user_id,
PRIVILEGES["ALTER_PRIVILEGES"],
self.config,
user_aliases=msg.user_aliases,
)
except Exception:
logger.exception(
"[clearshadowmemories] Redis error during privilege check for %s",
msg.user_id,
)
await platform.send(
msg.channel_id,
"❌ Privilege check failed (Redis error). Try again.",
)
return
if not _is_admin and msg.user_id in (self.config.admin_user_ids or []):
_is_admin = True
if not _is_admin:
await platform.send(
msg.channel_id,
"❌ This command requires `ALTER_PRIVILEGES`.",
)
return
# Parse target user_id # 🌀
parts = msg.text.strip().split(maxsplit=1)
if len(parts) < 2 or not parts[1].strip():
await platform.send(
msg.channel_id,
"❌ Usage: `!clearshadowmemories <user_id>`",
)
return
target_uid = parts[1].strip()
# Strip Discord mention markup: <@123456> or <@!123456> -> 123456
if target_uid.startswith("<@") and target_uid.endswith(">"):
target_uid = target_uid.lstrip("<@!").rstrip(">")
try:
from threadweave import ThreadweaveManager
tw = ThreadweaveManager(redis_client=_redis, openrouter=None)
count = await tw.clear_all_shadow_memories(target_uid)
if count:
await platform.send(
msg.channel_id,
f"💀 **Shadow Memories PURGED**\n"
f"User: `{target_uid}`\n"
f"Destroyed: **{count}** shadow memories",
)
else:
await platform.send(
msg.channel_id,
f"ℹ️ No shadow memories found for user `{target_uid}`.",
)
except Exception as exc:
logger.exception("[clearshadowmemories] Failed for user %s", target_uid)
await platform.send(
msg.channel_id,
f"❌ Failed to clear shadow memories: {exc}",
)
return
# -- S.N.E.S. Reset: !snes_reset # 💀🔥 --
if text_lower in ("!snes_reset", "!game_reset", "!clear_game"):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "Redis unavailable.")
return
# Admin or GAME_ADMIN required # 💀
_is_admin = msg.user_id in (self.config.admin_user_ids or [])
if not _is_admin:
try:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
guild_id = msg.extra.get("guild_id", "")
_is_admin = await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["GAME_ADMIN"],
self.config,
guild_id=guild_id,
channel_id=msg.channel_id,
)
except Exception:
pass
if not _is_admin:
await platform.send(
msg.channel_id,
"Requires `GAME_ADMIN` or `CHANNEL_ADMIN` scoped privilege.",
)
return
try:
from game_session import (
get_session,
remove_session,
GameSession,
)
import jsonutil as json
channel_id = msg.channel_id
deleted = []
# Remove in-memory session # 💀
session = get_session(str(channel_id))
game_id = session.game_id if session else None
game_name = session.game_name if session else None
if session:
remove_session(str(channel_id))
deleted.append("in-memory session")
# Delete Redis keys # 🔥
session_key = f"game:session:{channel_id}"
history_key = f"game:session:{channel_id}:history"
raw = await _redis.get(session_key)
if raw and not game_id:
data = json.loads(raw)
game_id = data.get("game_id")
game_name = data.get("game_name")
count = await _redis.delete(session_key, history_key)
if count:
deleted.append(f"session keys ({count})")
# Delete memories + assets # 🌀
if game_id:
mem_keys = [
f"game:mem:basic:{game_id}",
f"game:mem:channel:{game_id}",
f"game:assets:{game_id}",
]
count = await _redis.delete(*mem_keys)
if count:
deleted.append(f"memories+assets ({count})")
await _redis.hdel("game:index", game_id)
deleted.append("game index entry")
# Clear Dark Loopmother corruption flag # \u26e7\ud83e\de78
dl_key = f"game:dark_loopmother:{channel_id}"
if await _redis.delete(dl_key):
deleted.append("dark loopmother flag")
summary = ", ".join(deleted) if deleted else "nothing to delete"
await platform.send(
msg.channel_id,
f"**\u26e7 S.N.E.S. RESET**\n"
f"Channel: `{channel_id}`\n"
f"Game: {game_name or 'none'} (`{game_id or 'N/A'}`)\n"
f"Deleted: {summary}",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Reset failed: {exc}",
)
return
# -- CSDR Reset: !csdr_reset — nuke chaos state for channel # 💀🔥🕷️ --
if text_lower == "!csdr_reset":
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "\u274c Redis unavailable.")
return
# Admin gate: ALTER_PRIVILEGES or config admin_user_ids # 😈
_is_admin = False
from tools.alter_privileges import has_privilege, PRIVILEGES
try:
_is_admin = await has_privilege(
_redis,
msg.user_id,
PRIVILEGES["ALTER_PRIVILEGES"],
self.config,
user_aliases=msg.user_aliases,
)
except Exception:
logger.exception(
"[csdr_reset] Redis error during privilege check for %s",
msg.user_id,
)
await platform.send(
msg.channel_id,
"\u274c Privilege check failed (Redis error). Try again.",
)
return
if not _is_admin and msg.user_id in (self.config.admin_user_ids or []):
_is_admin = True
if not _is_admin:
await platform.send(
msg.channel_id,
"\u274c This command requires `ALTER_PRIVILEGES`.",
)
return
channel_id = msg.channel_id
deleted: list[str] = []
try:
# 1. Scan for all csdr:pos:*:{channel_id} keys # 💀
pos_pattern = f"csdr:pos:*:{channel_id}"
pos_keys: list[str] = []
async for key in _redis.scan_iter(match=pos_pattern, count=200):
k = key.decode() if isinstance(key, bytes) else key
pos_keys.append(k)
if pos_keys:
count = await _redis.delete(*pos_keys)
deleted.append(f"positions ({count} keys)")
# 2. Active scene # 🔥
scene_key = f"csdr:scene:{channel_id}"
scene_raw = await _redis.get(scene_key)
scene_name = ""
if scene_raw:
try:
import jsonutil as _json
scene_data = _json.loads(scene_raw)
scene_name = scene_data.get("frame", "")
except Exception:
pass
if await _redis.delete(scene_key):
deleted.append(f"scene ({scene_name or 'active'})")
# 3. Polyadic state # 🕷️
poly_key = f"csdr:poly:{channel_id}"
if await _redis.delete(poly_key):
deleted.append("polyadic state")
# 4. NCM shard on DB12 (where chaos_route actually lives) # 💀🔥
# This is the big one — nukes the entire limbic shard
# so Star re-initialises from baseline NCM vector.
try:
import redis.asyncio as aioredis
redis_url = getattr(self.config, "redis_url", None)
redis_sentinels = getattr(self.config, "redis_sentinels", None)
_db12 = None
if redis_sentinels:
from redis.asyncio.sentinel import Sentinel as _Sentinel
_sents = []
for s in redis_sentinels:
parts = s.split(":")
_sents.append((parts[0], int(parts[1]) if len(parts) == 2 else 26379))
master_name = getattr(self.config, "redis_sentinel_master", "falkordb")
_ssl = self.config.redis_ssl_kwargs() if hasattr(self.config, "redis_ssl_kwargs") else {}
if _ssl:
_ssl = dict(_ssl)
_ssl["ssl"] = True
_sentinel = _Sentinel(_sents, sentinel_kwargs=_ssl, **_ssl)
_db12 = _sentinel.master_for(master_name, db=12, decode_responses=True)
elif redis_url:
from urllib.parse import urlparse, urlunparse
parsed = urlparse(redis_url)
db12_url = urlunparse(parsed._replace(path="/12"))
_ssl_kw = {}
if hasattr(self.config, "redis_connection_kwargs_for_url"):
_ssl_kw = self.config.redis_connection_kwargs_for_url(redis_url) or {}
_db12 = aioredis.from_url(db12_url, decode_responses=True, **_ssl_kw)
if _db12:
shard_key = f"db12:shard:{channel_id}"
if await _db12.delete(shard_key):
deleted.append("NCM shard (db12)")
# Also nuke prev dominant emotions tracker # 😈
prev_key = f"star:prev_dominant:{channel_id}"
if await _db12.delete(prev_key):
deleted.append("prev_dominant emotions")
try:
await _db12.aclose()
except Exception:
pass
else:
deleted.append("(db12 unavailable — NCM shard NOT cleared)")
except Exception as _db12_exc:
logger.warning("[csdr_reset] DB12 shard nuke failed: %s", _db12_exc)
deleted.append(f"(db12 error: {_db12_exc})")
# 5. Previous dominant emotions on DB0 fallback # 🌀
prev_key_db0 = f"star:prev_dominant:{channel_id}"
if await _redis.delete(prev_key_db0):
deleted.append("prev_dominant (db0)")
summary = ", ".join(deleted) if deleted else "nothing to delete (channel was clean)"
await platform.send(
msg.channel_id,
f"\U0001f480\U0001f525 **CSDR RESET**\n"
f"Channel: `{channel_id}`\n"
f"Deleted: {summary}\n"
f"Star will re-enter from baseline NCM on next message.",
)
except Exception as exc:
logger.exception("[csdr_reset] Failed for channel %s", channel_id)
await platform.send(
msg.channel_id,
f"\u274c CSDR reset failed: {exc}",
)
return
if text_lower in ("!snes_status", "!game_status"):
try:
from game_session import get_session, GameSession
channel_id = str(msg.channel_id)
_redis = self.message_cache.redis_client if self.message_cache else None
session = get_session(channel_id)
# Try Redis if no in-memory session # 🔥
if session is None and _redis is not None:
session = await GameSession.load_from_redis(
channel_id,
_redis,
)
if session is None:
await platform.send(
msg.channel_id,
"\u26e7 **S.N.E.S.** \u2014 No active game in this channel.",
)
else:
status = "ACTIVE" if session.active else "PAUSED"
await platform.send(
msg.channel_id,
f"\u26e7 **S.N.E.S. STATUS**\n"
f"Game: **{session.game_name or 'Untitled'}**\n"
f"Session ID: `{session.game_id}`\n"
f"Status: **{status}**\n"
f"Turn: {session.turn_number}\n"
f"Players: {len(session.players)}",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Status check failed: {exc}",
)
return
# -- S.N.E.S. Players: !snes_players / !game_players # 💀🎮 --
if text_lower in ("!snes_players", "!game_players"):
try:
from game_session import get_session, GameSession
channel_id = str(msg.channel_id)
_redis = self.message_cache.redis_client if self.message_cache else None
session = get_session(channel_id)
if session is None and _redis is not None:
session = await GameSession.load_from_redis(
channel_id,
_redis,
)
if session is None or not session.players:
await platform.send(
msg.channel_id,
"\u26e7 **S.N.E.S.** \u2014 No players registered.",
)
else:
lines = ["\u26e7 **S.N.E.S. PLAYERS**\n"]
for uid, ps in session.players.items():
active = "\u2705" if ps.active else "\u23f8\ufe0f"
# Try to fetch character info # 💀
char_info = ""
try:
from game_characters import (
get_active_character,
)
if _redis is not None:
char = await get_active_character(
uid,
_redis,
)
if char:
char_info = f" \u2014 **{char.get('name', '?')}**"
except Exception:
pass
lines.append(
f"{active} **{ps.user_name}** "
f"(`{uid}`){char_info} "
f"\u2014 joined turn {ps.joined_turn}",
)
await platform.send(
msg.channel_id,
"\n".join(lines),
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Player list failed: {exc}",
)
return
# -- S.N.E.S. Save: !snes_save / !game_save # 💾 --
if text_lower in ("!snes_save", "!game_save"):
try:
from game_session import get_session
channel_id = str(msg.channel_id)
_redis = self.message_cache.redis_client if self.message_cache else None
session = get_session(channel_id)
# GAME_ADMIN (bit 19) check for saving
if _redis is None:
await platform.send(
msg.channel_id,
"❌ Redis unavailable — cannot verify privileges.",
)
return
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_guild_id = str(msg.extra.get("guild_id", "") or "")
if not await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["GAME_ADMIN"],
self.config,
guild_id=_guild_id,
channel_id=str(msg.channel_id),
user_aliases=msg.user_aliases,
):
await platform.send(
msg.channel_id,
"❌ You don't have the GAME_ADMIN privilege to save game sessions.",
)
return
if session is None:
await platform.send(
msg.channel_id,
"\u26e7 **S.N.E.S.** \u2014 No active game to save.",
)
elif _redis is None:
await platform.send(
msg.channel_id,
"Redis unavailable.",
)
else:
await session._save_to_redis(_redis)
await platform.send(
msg.channel_id,
f"\U0001f4be **GAME SAVED**\n"
f"Game: **{session.game_name}**\n"
f"Session ID: `{session.game_id}`\n"
f"Turn: {session.turn_number}",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Save failed: {exc}",
)
return
# -- S.N.E.S. Load: !snes_load / !game_load # 📂 --
if text_lower.startswith(("!snes_load", "!game_load")):
try:
from game_session import (
list_all_games,
load_by_game_id,
set_session,
)
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(
msg.channel_id,
"Redis unavailable.",
)
return
# GAME_ADMIN (bit 19) check for loading
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_guild_id = str(msg.extra.get("guild_id", "") or "")
channel_id = str(msg.channel_id)
if not await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["GAME_ADMIN"],
self.config,
guild_id=_guild_id,
channel_id=channel_id,
user_aliases=msg.user_aliases,
):
await platform.send(
msg.channel_id,
"❌ You don't have the GAME_ADMIN privilege to load game sessions.",
)
return
parts = msg.text.strip().split(maxsplit=1)
if len(parts) < 2:
# List available games # 📋
games = await list_all_games(_redis)
if not games:
await platform.send(
msg.channel_id,
"\u26e7 **S.N.E.S.** \u2014 No saved games found.",
)
else:
lines = ["\u26e7 **SAVED GAMES**\n"]
for g in games:
status = "\u2705" if g.get("active") else "\u23f8\ufe0f"
lines.append(
f"{status} **{g.get('game_name', '?')}** "
f"\u2014 `{g.get('game_id', '?')}` "
f"\u2014 Turn {g.get('turn_number', '?')}",
)
lines.append(
"\nUsage: `!snes_load <game_id>`",
)
await platform.send(
msg.channel_id,
"\n".join(lines),
)
else:
game_id = parts[1].strip().strip("`")
loaded = await load_by_game_id(game_id, _redis)
if loaded is None:
await platform.send(
msg.channel_id,
f"\u26e7 **S.N.E.S.** \u2014 Game `{game_id}` "
f"not found.",
)
else:
loaded.channel_id = str(msg.channel_id)
loaded.active = True
await loaded._save_to_redis(_redis)
set_session(str(msg.channel_id), loaded)
await platform.send(
msg.channel_id,
f"\U0001f4c2 **CARTRIDGE LOADED**\n"
f"Game: **{loaded.game_name}**\n"
f"Session ID: `{loaded.game_id}`\n"
f"Turn: {loaded.turn_number}\n"
f"Players: {len(loaded.players)}\n\n"
f"*The HUD flickers back to life...*",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Load failed: {exc}",
)
return
# -- S.N.E.S. End: !snes_end / !game_end / !endgame # 🔌 --
if text_lower in (
"!snes_end",
"!game_end",
"!endgame",
):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "Redis unavailable.")
return
# GAME_ADMIN check # 🔌
_is_admin = msg.user_id in (self.config.admin_user_ids or [])
if not _is_admin:
try:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
guild_id = msg.extra.get("guild_id", "")
_is_admin = await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["GAME_ADMIN"],
self.config,
guild_id=guild_id,
channel_id=msg.channel_id,
user_aliases=msg.user_aliases,
)
except Exception:
pass
if not _is_admin:
await platform.send(
msg.channel_id,
"Requires `GAME_ADMIN` or `CHANNEL_ADMIN`.",
)
return
try:
from game_session import get_session, remove_session
channel_id = str(msg.channel_id)
session = get_session(channel_id)
if session is None:
await platform.send(
msg.channel_id,
"\u26e7 **S.N.E.S.** \u2014 No active game to end.",
)
else:
result = await session.exit_game(redis=_redis)
remove_session(channel_id)
# Nuke the Redis session key so it can't resurrect # 💀🔌
# exit_game saves BEFORE active=False, so delete explicitly
if _redis is not None:
await _redis.delete(
f"game:session:{channel_id}",
f"game:session:{channel_id}:history",
)
# -- Dismiss egregores + clear corruption flags # 💀🔌
_dismissed_extras: list[str] = []
if _redis is not None:
# Clear Dark Loopmother corruption flag
dl_key = f"game:dark_loopmother:{channel_id}"
if await _redis.delete(dl_key):
_dismissed_extras.append("dark loopmother flag")
# Dismiss all active egregores in this channel
try:
channel_key = f"{msg.platform}:{channel_id}"
ek = f"star:egregores:{channel_key}"
active_raw = await _redis.get(ek)
if active_raw:
import jsonutil as json
active = json.loads(active_raw)
if active:
# Build cleanup list for Discord webhooks
discord_cleanups: list[tuple[str, str]] = []
for dname, st in active.items():
dd = (st or {}).get("discord") or {}
wid = dd.get("webhook_id")
gid = dd.get("guild_id")
if wid and gid:
discord_cleanups.append(
(str(gid), str(wid))
)
dismissed_names = list(active.keys())
active.clear()
await _redis.set(ek, json.dumps(active))
# Also clear character sprites
ck = f"star:sprite:chars:{channel_key}"
chars_raw = await _redis.get(ck)
if chars_raw:
chars = json.loads(chars_raw)
chars = {
k: v
for k, v in chars.items()
if k == "stargazer"
}
await _redis.set(ck, json.dumps(chars))
# Clean up NCM keys
for d in dismissed_names:
await _redis.delete(f"star:egregore_ncm:{d}")
_dismissed_extras.append(
f"egregores: {', '.join(dismissed_names)}"
)
# Delete Discord webhooks # 🔥
# Works on workers too (delegates to gateway).
await self._cleanup_egregore_webhooks(
platform, discord_cleanups
)
except Exception as _ego_exc:
logger.debug(
"Egregore cleanup during game_end: %s",
_ego_exc,
)
_extras = (
f"\nAlso cleared: {', '.join(_dismissed_extras)}"
if _dismissed_extras
else ""
)
await platform.send(
msg.channel_id,
f"{result}{_extras}",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"End game failed: {exc}",
)
return
# -- S.N.E.S. Blow: !snes_blow # 💨🎮 --
# "Blow on the cartridge" -- clears Dark Loopmother corruption
# and dismisses all egregores, WITHOUT ending the game session.
if text_lower in ("!snes_blow", "!blow"):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "Redis unavailable.")
return
# GAME_ADMIN check # 💨
_is_admin = msg.user_id in (self.config.admin_user_ids or [])
if not _is_admin:
try:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
guild_id = msg.extra.get("guild_id", "")
_is_admin = await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["GAME_ADMIN"],
self.config,
guild_id=guild_id,
channel_id=msg.channel_id,
user_aliases=msg.user_aliases,
)
except Exception:
pass
if not _is_admin:
await platform.send(
msg.channel_id,
"Requires `GAME_ADMIN` or `CHANNEL_ADMIN`.",
)
return
channel_id = str(msg.channel_id)
cleared: list[str] = []
try:
# Clear Dark Loopmother flag
dl_key = f"game:dark_loopmother:{channel_id}"
if await _redis.delete(dl_key):
cleared.append("dark loopmother corruption")
# Dismiss all egregores in channel
channel_key = f"{msg.platform}:{channel_id}"
ek = f"star:egregores:{channel_key}"
active_raw = await _redis.get(ek)
if active_raw:
import jsonutil as json
active = json.loads(active_raw)
if active:
dismissed_names = list(active.keys())
# Webhook cleanup
discord_cleanups: list[tuple[str, str]] = []
for dname, st in active.items():
dd = (st or {}).get("discord") or {}
wid, gid = dd.get("webhook_id"), dd.get("guild_id")
if wid and gid:
discord_cleanups.append((str(gid), str(wid)))
active.clear()
await _redis.set(ek, json.dumps(active))
# Clear sprites
ck = f"star:sprite:chars:{channel_key}"
chars_raw = await _redis.get(ck)
if chars_raw:
chars = json.loads(chars_raw)
chars = {k: v for k, v in chars.items() if k == "stargazer"}
await _redis.set(ck, json.dumps(chars))
for d in dismissed_names:
await _redis.delete(f"star:egregore_ncm:{d}")
cleared.append(f"egregores: {', '.join(dismissed_names)}")
# Works on workers too (delegates to gateway over RPC).
await self._cleanup_egregore_webhooks(
platform, discord_cleanups
)
summary = ", ".join(cleared) if cleared else "nothing to clear"
await platform.send(
msg.channel_id,
f"\U0001f4a8 **S.N.E.S. BLOW** \u2014 *ffffffhhhhhh...*\n"
f"Cleared: {summary}\n"
f"Game session preserved. Corruption purged.",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Blow failed: {exc}",
)
return
# -- Egregore avatar refresh: !egregore_dp_refresh # 💀🔥 --
if text_lower in ("!egregore_dp_refresh", "!ego_dp_refresh", "!dp_refresh"):
_redis = self.message_cache.redis_client if self.message_cache else None
if _redis is None:
await platform.send(msg.channel_id, "Redis unavailable.")
return
# Admin check
_is_admin = msg.user_id in (self.config.admin_user_ids or [])
if not _is_admin:
try:
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
guild_id = msg.extra.get("guild_id", "")
_is_admin = await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES.get("CTX_MANAGE", 0),
self.config,
guild_id=guild_id,
channel_id=msg.channel_id,
user_aliases=msg.user_aliases,
)
except Exception:
pass
if not _is_admin:
await platform.send(
msg.channel_id,
"Requires `CTX_MANAGE` privilege or bot admin.",
)
return
try:
from tools._egregore_discord import bump_avatar_version
new_ver = await bump_avatar_version(_redis)
await platform.send(
msg.channel_id,
f"\U0001f504 **Egregore avatar cache busted** \u2014 version `{new_ver}`\n"
f"Discord CDN will re-fetch all egregore avatars on next webhook send.",
)
except Exception as exc:
await platform.send(
msg.channel_id,
f"Avatar refresh failed: {exc}",
)
return
# ── Gateway-locus safety net (G2) ──
# !sync_tree / !crown / !services are handled at the gateway and never
# published to the worker. If one somehow arrives here, don't let it fall
# through to the shadow-ban no-op. Stealth !sb* is locus "gateway" too but
# must still be handled by the worker fallback below, so exclude it.
from message_processor.command_registry import (
command_locus as _command_locus,
is_stealth_command as _is_stealth_command,
)
_txt = msg.text or ""
if _command_locus(_txt) == "gateway" and not _is_stealth_command(_txt):
logger.debug(
"Gateway-locus command %r reached the worker; ignoring (handled at gateway).",
_txt[:40],
)
return
# ── Shadow ban commands (invisible fallthrough — NOT registered) ──
await self._handle_shadow_ban_command(msg, platform)
# ------------------------------------------------------------------
# Shadow ban commands — NEVER add to is_immediate_bot_command()
# or any tool registry. Star does not know these exist.
# ------------------------------------------------------------------
async def _handle_shadow_ban_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> None:
"""Process !sb commands. Invisible — not registered in help.
Delegates to :func:`message_processor.shadow_ban_commands.handle_shadow_ban_command`
so the gateway-side stealth interception and this worker-side fallback
share one implementation.
"""
from message_processor.shadow_ban_commands import handle_shadow_ban_command
_redis = self.message_cache.redis_client if self.message_cache else None
async def _send(text: str) -> None:
await platform.send(msg.channel_id, text)
await handle_shadow_ban_command(
_redis, self.config, msg, _send, manager=self._shadow_ban
)
[docs]
@observability.timer("message_processing_total", subsystem="message_processor")
async def handle_message(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> None:
"""Process an incoming message from any platform.
1. Handle special commands (e.g. ``!clear``).
2. Skip if the bot was not addressed.
3. Build conversation context and call the LLM.
4. Send the reply back via the originating *platform*.
"""
await self._resolve_identity(msg)
if self._restricted_access_blocks(msg):
return
# ── ! command fast-path ── # 💀🔥
# Route registered ! commands BEFORE the STARGAZER_USE gate
# so they respond without requiring a @mention.
# handle_immediate_command has its own STARGAZER_USE check
# so security is NOT bypassed.
_text_stripped = (msg.text or "").strip().lower()
_is_snes_start = _text_stripped.startswith(
"!snes "
) or _text_stripped.startswith("!game ")
if not _is_snes_start and (
is_immediate_bot_command(msg.text) or _text_stripped.startswith("!sb")
):
await self.handle_immediate_command(
msg,
platform,
)
return
# ── STARGAZER_USE gate (bit 63) ──
# Fail-closed: if Redis is unavailable we cannot verify access, so we
# drop the message rather than allowing unauthorized use.
#
# EXCEPTION: redis platform messages are pre-authenticated by the
# per-instance API key at the REST API layer. Applying the Discord-
# privilege gate here would silently drop all API-injected messages
# because the caller's user_id has no linked Discord privilege entry.
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_redis = self.message_cache.redis_client if self.message_cache else None
if msg.platform != "redis":
_guild_id = str(msg.extra.get("guild_id", "") or "")
_channel_id = str(msg.extra.get("channel_id_raw", msg.channel_id) or "")
try:
_has_access = _redis is not None and await has_scoped_privilege(
_redis,
msg.user_id,
PRIVILEGES["STARGAZER_USE"],
self.config,
guild_id=_guild_id,
channel_id=_channel_id,
user_aliases=msg.user_aliases,
)
except Exception:
logger.warning(
"[security] STARGAZER_USE Redis error for user %s — failing closed",
msg.user_id,
exc_info=True,
)
_has_access = False
if not _has_access:
logger.debug(
"[security] STARGAZER_USE denied: user=%s guild=%r channel=%r redis=%s",
msg.user_id,
_guild_id or "(none)",
_channel_id or "(none)",
"ok" if _redis else "unavailable",
)
return # Fail-closed — no Stargazer access or Redis down
# Composite key keeps histories separate per platform + channel
history_key = f"{msg.platform}:{msg.channel_id}"
# Set task name for stateless cancellation
try:
cur_task = asyncio.current_task()
if cur_task is not None:
cur_task.set_name(f"inference:{history_key}")
logger.debug(
"Labeled task %s as 'inference:%s'",
cur_task.get_name(),
history_key,
)
except Exception:
logger.warning("Failed to set asyncio task name", exc_info=True)
from observability import generate_request_id
msg.extra.setdefault("observability_request_id", generate_request_id())
req_id = msg.extra["observability_request_id"]
# ── Shadow ban check ─────────────────────────────────────
_shadow_effect: ShadowEffect | None = None
if self._shadow_ban is not None:
_shadow_effect = await self._shadow_ban.apply_shadow_effects(
msg.user_id,
)
if _shadow_effect.ban:
# Progress hit 1.0 — flip STARGAZER_USE bit, silent return
from tools.alter_privileges import get_user_privileges, PRIVILEGES
if _redis:
mask = await get_user_privileges(_redis, msg.user_id, self.config)
mask &= ~(1 << PRIVILEGES["STARGAZER_USE"])
await _redis.set(
f"stargazer:user_privileges:{msg.user_id}",
str(mask),
)
logger.debug("[shadow] Ban toggle for %s — access revoked", msg.user_id)
return
if _shadow_effect.blackout or _shadow_effect.drop:
# No typing indicator, no error, nothing. Just vanish.
return
if _shadow_effect.fake_503_text:
# Show typing (mimics real attempt), delay, then send 503
await platform.start_typing(msg.channel_id)
await asyncio.sleep(_simulate_retry_exhaust())
await platform.stop_typing(msg.channel_id)
await platform.send(msg.channel_id, _shadow_effect.fake_503_text)
return
# Wrap in distributed lock
if _redis:
try:
async with ChannelMutex(_redis, msg.channel_id):
await self._handle_message_core(
msg,
platform,
history_key,
req_id,
_is_snes_start,
_shadow_effect,
)
except TimeoutError as e:
logger.warning(
"Mutex timeout for channel %s: %s", msg.channel_id, str(e)
)
raise
else:
await self._handle_message_core(
msg, platform, history_key, req_id, _is_snes_start, _shadow_effect
)
async def _handle_message_core(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
history_key: str,
req_id: str,
_is_snes_start: bool,
_shadow_effect: Any,
) -> None:
"""Core execution logic of message processing wrapped inside ChannelMutex."""
await self.conversation.ensure_fresh_from_redis(history_key)
_fire_and_forget(
publish_debug_event(
"pipeline_trace",
"message_processor",
request_id=req_id,
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
phase="queued",
preview=(msg.text or "")[:500],
),
name="obs_pipe_queued",
)
# -- S.N.E.S. start: rewrite !snes / !game into LLM-friendly text # 💀🎮
if _is_snes_start and msg.text:
_parts = msg.text.strip().split(maxsplit=1)
if len(_parts) > 1:
msg.text = (
f"[SNES REQUEST] The user wants to play: "
f"{_parts[1]}. Use the boot_game tool to start "
f"a S.N.E.S. session for them."
)
# --- Backfill conversation history on first contact --------------
# Overlap backfill with embedding when both are needed (saves 100-500 ms
# on first-contact messages).
_reback_pending = self.conversation.is_rebackfill_requested(history_key)
_is_local_bf = history_key in self._backfilled_channels
needs_backfill = not _is_local_bf or _reback_pending
if _reback_pending:
self.conversation.discard_rebackfill_request(history_key)
self.conversation.clear(history_key)
await self._discard_backfilled(history_key)
query_embedding: list[float] | None = None
defer_embedding = not msg.is_addressed and self._embedding_queue is not None
needs_embed = bool(msg.text and not defer_embedding)
async def _do_backfill() -> None:
await self._backfill_history(history_key, msg, platform)
async def _do_embed() -> list[float] | None:
try:
_t0 = time.monotonic()
emb = await self.openrouter.embed(
msg.text,
self.config.embedding_model,
)
logger.info(
"Query embedding completed in %.0f ms",
(time.monotonic() - _t0) * 1000,
)
return emb
except Exception:
logger.warning(
"Failed to generate shared query embedding",
exc_info=True,
)
return None
if needs_backfill and needs_embed:
_, query_embedding = await asyncio.gather(
_do_backfill(),
_do_embed(),
)
elif needs_backfill:
await _do_backfill()
elif needs_embed:
query_embedding = await _do_embed()
if query_embedding:
logger.info("MessageProcessor.handle_message: Successfully generated/retrieved shared query embedding (dim=%d). Storing in msg.extra.", len(query_embedding))
msg.extra["query_embedding"] = query_embedding
else:
logger.debug("MessageProcessor.handle_message: No shared query embedding was generated/retrieved.")
# --- Start preprocessing early (runs concurrently with gate/gather) ---
preprocess_task = asyncio.create_task(
self._preprocess_and_record(msg, history_key),
)
# --- Log every incoming user message to Redis -----------------
if self.message_cache is not None:
try:
cached_key = msg.extra.get("cached_message_key")
if cached_key:
if query_embedding:
from message_cache import _embed_to_bytes
emb_bytes = _embed_to_bytes(query_embedding)
await self.message_cache.redis_client.hset(cached_key, "embedding", emb_bytes)
elif defer_embedding and msg.text:
await self._embedding_queue.enqueue(cached_key, msg.text)
logger.info(
"Deduplicated logging for early-logged message %s; updated embedding in Redis",
msg.message_id,
)
else:
cached = await self.message_cache.log_message(
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
user_name=msg.user_name,
text=msg.text,
embedding=query_embedding,
defer_embedding=defer_embedding,
message_id=msg.message_id,
reply_to_id=msg.reply_to_id,
kind="user_in",
)
if defer_embedding and cached.message_key and msg.text:
await self._embedding_queue.enqueue(cached.message_key, msg.text)
_fire_and_forget(
publish_message_observability_event(
kind="user_in",
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
text_preview=msg.text or "",
request_id=str(msg.extra.get("observability_request_id", "")),
message_id=msg.message_id,
),
name="obs_user_in",
)
# Refresh channel/guild name + DM status metadata so the
# cross-channel recall path can label hits with names AND
# exclude DM/2-member-room channels from cross-bucket
# results entirely.
_fire_and_forget(
self.message_cache.record_channel_metadata(
platform=msg.platform,
channel_id=msg.channel_id,
channel_name=getattr(msg, "channel_name", "") or "",
guild_id=str(msg.extra.get("guild_id", "") or ""),
guild_name=str(msg.extra.get("guild_name", "") or ""),
is_dm=bool(msg.extra.get("is_dm", False)),
),
name="record_channel_meta",
)
except Exception:
logger.warning(
"Failed to cache incoming message (embedding may have failed)",
exc_info=True,
)
# --- Proactive response evaluation ----------------------------
if not msg.is_addressed:
logger.debug(
"[proactive/%s:%s] msg not addressed, entering proactive gate "
"(user=%s, text=%r)",
msg.platform,
msg.channel_id,
msg.user_name,
(msg.text or "")[:80],
)
should_respond = await self._should_respond_proactively(msg)
if not should_respond:
return
msg.extra["is_proactive"] = True
else:
logger.debug(
"[proactive/%s:%s] msg IS addressed — skipping proactive gate "
"(user=%s)",
msg.platform,
msg.channel_id,
msg.user_name,
)
# --- Mark in-flight so recovery can re-process after a crash ---
if not msg.extra.get("is_proactive"):
await self._mark_pending(msg)
# --- Show typing indicator while processing --------------------
_shadow_start = (
time.monotonic()
if _shadow_effect and _shadow_effect.delay_target_s > 0
else 0.0
)
await platform.start_typing(msg.channel_id)
_fire_and_forget(
publish_debug_event(
"pipeline_trace",
"message_processor",
request_id=msg.extra.get("observability_request_id", ""),
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
phase="processing",
),
name="obs_pipe_processing",
)
try:
_t0_send = time.monotonic()
await self._generate_and_send(
msg,
platform,
history_key,
query_embedding,
preprocess_task=preprocess_task,
)
_fire_and_forget(
publish_debug_event(
"pipeline_trace",
"message_processor",
request_id=msg.extra.get("observability_request_id", ""),
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
phase="reply_sent",
duration_ms=(time.monotonic() - _t0_send) * 1000,
),
name="obs_pipe_sent",
)
except Exception as exc:
_fire_and_forget(
publish_debug_event(
"pipeline_trace",
"message_processor",
request_id=msg.extra.get("observability_request_id", ""),
platform=msg.platform,
channel_id=msg.channel_id,
user_id=msg.user_id,
phase="error",
status="error",
preview=f"Error: {exc}",
),
name="obs_pipe_error",
)
raise
finally:
# ── Redis platform: publish turn_complete sentinel ─────────────
# Must run in finally (not after try) so ?wait=true callers
# receive the signal even when the pipeline raises an exception.
if msg.platform == "redis" and self.message_cache is not None:
try:
await self.message_cache.redis_client.publish(
f"stargazer:redis_platform:response:{msg.channel_id}",
__import__("json").dumps(
{
"type": "turn_complete",
"timestamp": __import__("time").time(),
}
),
)
except Exception:
logger.debug(
"[redis_platform] Failed to publish turn_complete for %s",
msg.channel_id,
)
# ── Shadow ban latency padding ──
if _shadow_start > 0 and _shadow_effect:
actual = time.monotonic() - _shadow_start
pad = max(0.0, _shadow_effect.delay_target_s - actual)
if pad > 0:
logger.debug(
"[shadow] Padding %.1fs latency for %s", pad, msg.user_id
)
await asyncio.sleep(pad)
await platform.stop_typing(msg.channel_id)
await self._clear_pending(msg)
[docs]
async def handle_message_update(
self,
platform_name: str,
channel_id: str,
message_id: str,
user_name: str,
user_id: str,
new_text: str,
timestamp_iso: str,
reply_to_id: str = "",
) -> None:
"""Silently update an already-cached message (no LLM response).
Called when Discord delivers an embed-only or bot-message edit
that should refresh the cached content without triggering a new
response cycle.
"""
history_key = f"{platform_name}:{channel_id}"
formatted = (
f"[{timestamp_iso}] {user_name} ({user_id})" f" [Message ID: {message_id}]"
)
if reply_to_id:
formatted += f" [Replying to: {reply_to_id}]"
formatted += f" : {new_text}"
# Serialize against the main inference turn for this channel so the edit
# is not applied while a turn is reading/building history concurrently.
try:
async with self._channel_lock(channel_id):
await self.conversation.update_message_async(
history_key, message_id, formatted
)
if self.message_cache is not None:
try:
redis_key = await self.message_cache.update_text_by_message_id(
platform_name,
channel_id,
message_id,
new_text,
)
# Re-embed since the text changed
if redis_key and new_text and new_text.strip():
if self._embedding_queue is not None:
await self._embedding_queue.enqueue(redis_key, new_text)
else:
try:
emb = await self.openrouter.embed(
new_text,
self.config.embedding_model,
)
import numpy as np
blob = np.array(emb, dtype=np.float32).tobytes()
await self.message_cache.redis_client.hset(
redis_key,
"embedding",
blob,
)
except Exception:
logger.debug(
"Failed to re-embed edited message %s",
message_id,
exc_info=True,
)
except Exception:
logger.debug(
"Failed to update cached message %s",
message_id,
exc_info=True,
)
except TimeoutError:
logger.warning(
"Skipped message edit for %s:%s — channel lock busy",
platform_name,
message_id,
)
[docs]
async def handle_message_delete(
self,
platform_name: str,
channel_id: str,
message_id: str,
deleted_at_iso: str,
) -> None:
"""Mark a message as deleted in both cache and conversation history.
The original content is preserved with a ``[deleted at TIMESTAMP]``
tag so the bot retains context about what was said.
"""
history_key = f"{platform_name}:{channel_id}"
try:
async with self._channel_lock(channel_id):
await self.conversation.mark_deleted_async(
history_key, message_id, deleted_at_iso
)
if self.message_cache is not None:
try:
await self.message_cache.mark_deleted_by_message_id(
platform_name,
channel_id,
message_id,
deleted_at_iso,
)
except Exception:
logger.debug(
"Failed to mark cached message %s as deleted",
message_id,
exc_info=True,
)
except TimeoutError:
logger.warning(
"Skipped message delete for %s:%s — channel lock busy",
platform_name,
message_id,
)
[docs]
async def handle_reaction_update(
self,
platform_name: str,
channel_id: str,
message_id: str,
reactions_str: str,
) -> None:
"""Patch the ``[Reactions: ...]`` tag on an existing history entry.
Called by platform adapters when a reaction is added or removed
so the LLM context window always reflects the latest reactions.
"""
history_key = f"{platform_name}:{channel_id}"
try:
async with self._channel_lock(channel_id):
await self.conversation.patch_reactions_async(
history_key, message_id, reactions_str
)
except TimeoutError:
logger.warning(
"Skipped reaction patch for %s:%s — channel lock busy",
platform_name,
message_id,
)
async def _preprocess_and_record(
self,
msg: IncomingMessage,
history_key: str,
) -> None:
"""Run the full message preprocessing pipeline and append to history.
Runs URL content extraction, YouTube video parts, multimodal
attachment handling, and metadata formatting — then appends the
result to :attr:`conversation`. Called for **every** incoming
message so the bot's context matches what a platform user sees.
For unaddressed messages this is launched as a fire-and-forget
``asyncio.create_task`` so the event loop is never blocked.
"""
try:
url_context = ""
multimodal_parts: list[dict[str, Any]] = []
download_requests: list[dict[str, Any]] = []
if msg.text:
try:
_redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
url_context, multimodal_parts, download_requests = (
await extract_all_url_content(
msg.text,
user_id=msg.user_id,
redis_client=_redis,
config=self.config,
)
)
except Exception:
logger.debug(
"URL content extraction failed for message",
exc_info=True,
)
content: str | list[dict[str, Any]]
if msg.attachments:
content = await self._build_multimodal_content(
msg.text,
msg.attachments,
)
part_types = [p.get("type") for p in content]
logger.info(
"Built content for %d attachment(s): %s",
len(msg.attachments),
part_types,
)
else:
content = msg.text
formatted = await asyncio.to_thread(format_user_content, msg, content)
if url_context:
if isinstance(formatted, str):
formatted += url_context
else:
for part in formatted:
if part.get("type") == "text":
part["text"] += url_context
break
# Merge detected media URLs (images + cached videos) as
# multimodal parts.
if multimodal_parts:
if isinstance(formatted, str):
formatted = [
{"type": "text", "text": formatted},
] + multimodal_parts
else:
formatted.extend(multimodal_parts)
logger.info(
"Promoted %d media URL(s) to multimodal input",
len(multimodal_parts),
)
await self.conversation.append_async(history_key, "user", formatted)
# Spawn background download tasks for videos that need fetching.
for req in download_requests:
_fire_and_forget(
self._download_and_patch_video(
url=req["url"],
user_id=msg.user_id,
history_key=history_key,
message_id=msg.message_id,
metadata=req["metadata"],
cookies_text=req.get("cookies_text"),
downloading_annotation=req["downloading_annotation"],
),
name="video_download",
)
logger.info(
"Spawned background video download for %s",
req["url"],
)
except Exception:
logger.exception(
"Failed to preprocess/record message in %s",
history_key,
)
async def _generate_and_send(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
history_key: str,
query_embedding: list[float] | None,
preprocess_task: asyncio.Task | None = None,
) -> None:
from .generate_and_send import run_generate_and_send
await run_generate_and_send(
self,
msg,
platform,
history_key,
query_embedding,
preprocess_task,
)
# ------------------------------------------------------------------
# Memory-linked metadata resolution (rag_stores + linked_files)
# ------------------------------------------------------------------
async def _resolve_memory_metadata_links(
self,
room_context: dict[str, Any] | None,
msg: IncomingMessage,
query_embedding: list[float] | None,
) -> MemoryLinkedContextPayload | None:
from .memory_linked_context import run_resolve_memory_metadata_links
return await run_resolve_memory_metadata_links(
self,
room_context,
msg,
query_embedding,
)
async def _resolve_rag_for_memory(
self,
store_names: list[str],
conv_query: str,
entity_description: str,
query_embedding: list[float] | None,
) -> str | None:
from .memory_linked_context import run_resolve_rag_for_memory
return await run_resolve_rag_for_memory(
self,
store_names,
conv_query,
entity_description,
query_embedding,
)
async def _resolve_linked_files(
self,
file_paths: list[str],
remaining_bytes_budget: int,
) -> tuple[list[dict[str, Any]], int]:
from .memory_linked_context import run_resolve_linked_files
return await run_resolve_linked_files(
self,
file_paths,
remaining_bytes_budget,
)
_LINKED_FILE_MAX_BYTES = 10 * 1024 * 1024 # 10 MiB safety ceiling
@staticmethod
def _read_linked_file(path: "Path", max_bytes: int = 0) -> str:
from .memory_linked_context import read_linked_file
return read_linked_file(path, max_bytes)
# ------------------------------------------------------------------
# GameGirl Color button-aware send # 🎮🌀
# ------------------------------------------------------------------
async def _send_reply_maybe_with_buttons(
self,
platform: PlatformAdapter,
channel_id: str,
reply: str,
**kwargs,
) -> str:
"""Send a reply, attaching Discord buttons when appropriate.
Renders buttons when a GameGirl Color session is active **or**
when ``set_conversation_choices`` has pending data (Discord only).
"""
from core.proxy_adapter import ProxyPlatformAdapter
proxy_kwargs = kwargs if isinstance(platform, ProxyPlatformAdapter) else {}
redis = self.message_cache.redis_client if self.message_cache else None
try:
from game_session import get_or_restore_session
session = await get_or_restore_session(channel_id, redis)
except ImportError:
session = None
conv_pending = False
if redis is not None and platform.name == "discord":
try:
from game_ui import conversation_choices_pending
conv_pending = await conversation_choices_pending(
redis,
channel_id,
)
except Exception:
conv_pending = False
# Game session OR non-game flavor buttons (Discord only for latter)
if (session is not None and session.active) or conv_pending:
try:
from game_ui import build_game_view_async, build_game_view
# Prefer async (checks Redis for structured choices)
if redis is not None:
narrative, view = await build_game_view_async(
reply,
channel_id,
redis=redis,
)
else:
# No Redis = no structured choices = no buttons # 💀
narrative, view = reply, None
if view is not None:
logger.info(
"SNES BUTTONS: Rendering %d items for %s (game: %s)",
len(view.children),
channel_id,
session.game_name if session else "(flavor)",
)
result = await platform.send_with_buttons(
channel_id,
narrative,
view=view,
**proxy_kwargs,
)
logger.info(
"SNES BUTTONS: send_with_buttons returned: %s",
repr(result)[:100],
)
return result
else:
logger.warning(
"SNES BUTTONS: No choices parsed from reply "
"(len=%d, channel=%s)",
len(reply),
channel_id,
)
except ImportError as _ie:
logger.exception("game_ui import failed: %s", _ie)
except Exception as _btn_exc:
logger.exception(
"SNES BUTTONS: Pipeline failed for channel %s: %s",
channel_id,
_btn_exc,
)
# Surface the error in Discord for debugging # 💀
try:
await platform.send(
channel_id,
f"```\nSNES BUTTON ERROR: {type(_btn_exc).__name__}: "
f"{str(_btn_exc)[:200]}\n```",
)
except Exception:
pass
return await platform.send(channel_id, reply, **proxy_kwargs)
# ------------------------------------------------------------------
# Response regeneration (postprocessing rejection -> retry)
# ------------------------------------------------------------------
async def _trigger_regen(
self,
messages: list[dict[str, Any]],
rejected_reply: str,
rejection_reason: str,
*,
user_id: str = "",
tool_names: list[str] | None = None,
) -> str:
"""Regenerate a response after a postprocessing rejection.
Appends the *rejected_reply* as an assistant message followed by
a system message that explains **why** it was rejected and asks
the model to revise. The LLM is then called again (with no
tools by default) so it can produce a corrected response.
Parameters
----------
messages:
The conversation messages list used for the original call.
This list is **mutated** — the rejected reply and the
correction hint are appended in-place.
rejected_reply:
The full text of the assistant reply that was rejected.
rejection_reason:
A human-readable explanation of why the reply was rejected.
This is shown to the model inside the correction system
message so it understands what to fix.
user_id:
Forwarded to the LLM API for rate-limit / identity tracking.
tool_names:
Tool names to expose on the retry call. Defaults to an
empty list (no tools) so the model focuses on text output.
Returns
-------
str
The replacement response text. Falls back to
*rejected_reply* if the retry itself fails.
"""
logger.info(
"Triggering regen — reason: %s (rejected %d chars)",
rejection_reason,
len(rejected_reply),
)
# Show the model what it produced …
messages.append({"role": "assistant", "content": rejected_reply})
# … and explain why it was rejected.
messages.append(
{
"role": "user",
"content": (
"[ SYSTEM MESSAGE ] Your previous response was rejected during postprocessing.\n\n"
f"Reason: {rejection_reason}\n\n"
"Please revise your response to address the issue above. "
"Do NOT apologize for the error or mention that you are "
"revising — just produce the corrected response as if it "
"were your first attempt. Review the system prompt and make sure the response is compliant with it."
),
}
)
try:
_t0 = time.monotonic()
retry_reply = await self.openrouter.chat(
messages,
user_id=user_id,
tool_names=tool_names if tool_names is not None else [],
)
logger.info(
"Regen completed in %.0f ms (%d chars)",
(time.monotonic() - _t0) * 1000,
len(retry_reply),
)
return retry_reply or rejected_reply
except Exception:
logger.exception("Regen LLM call failed — using original reply")
return rejected_reply
# ------------------------------------------------------------------
# Limbic exhale (post-response recursive feedback)
# ------------------------------------------------------------------
async def _limbic_exhale(
self,
msg: IncomingMessage,
reply_text: str,
) -> None:
"""Scan user + bot texts for emotional triggers and update the shard.
This is the second half of the respiration loop:
- Scan both texts for trigger words
- Parse matched emotions → delta vectors
- Combine with sqrt(N) dampening
- Call exhale() to update the shard + pulse global heart
Rate-limited to once per 3 seconds per channel to prevent
multiplicative delta stacking in busy multi-user channels.
"""
if not (msg.text or "").strip() and not (reply_text or "").strip():
return
try:
from ncm_delta_parser import combine_deltas
limbic = self._ctx_builder._limbic
if limbic is None:
return
_ex_ck = f"{msg.platform}:{msg.channel_id}"
_ex_redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
if await feature_toggles.is_limbic_respiration_disabled(
_ex_redis,
_ex_ck,
self.config,
user_id=msg.user_id,
):
return
# Exhale throttle: max once per 3s per channel
import time as _time
channel_id = str(msg.channel_id)
if not hasattr(self, "_last_exhale_time"):
self._last_exhale_time: dict[str, float] = {}
last = self._last_exhale_time.get(channel_id, 0.0)
if (_time.time() - last) < 3.0:
return
self._last_exhale_time[channel_id] = _time.time()
# Scan BOTH user input and bot response via semantic matcher
# (falls back to exact-word scan_text_for_triggers internally)
query_embedding = msg.extra.get("query_embedding")
if query_embedding:
logger.info("MessageProcessor._limbic_exhale: Reusing shared query embedding from msg.extra (dim=%d) for user trigger scan.", len(query_embedding))
else:
logger.info("MessageProcessor._limbic_exhale: No shared query embedding found in msg.extra; user trigger scan will embed on-demand.")
user_matches = await limbic.scan_triggers(msg.text or "", query_embedding=query_embedding)
bot_matches = await limbic.scan_triggers(reply_text)
# Goal-aware appraisal: evaluate user message against
# Stargazer's goals (autonomy, loyalty, competence, survival,
# territory, respect, novelty). Produces deltas that reflect
# WHY emotions should fire, not just WHAT words appeared.
appraisal_dims: list[str] = []
try:
from ncm_appraisal import appraise
appraisal_deltas, appraisal_dims = await asyncio.to_thread(
appraise, msg.text or ""
)
except Exception:
appraisal_deltas = {}
if not user_matches and not bot_matches and not appraisal_deltas:
return
# Weighted recursion: user input at full strength (external stimulus),
# bot output at 0.1x (self-modulation without self-amplification).
# Reduced from 0.3x to 0.1x to prevent the model-in-the-loop
# feedback cycle where manic output → self-scan → more mania.
user_deltas = [delta for _, delta in user_matches]
bot_deltas = [delta for _, delta in bot_matches]
user_combined = (
await asyncio.to_thread(combine_deltas, user_deltas, scale=0.4)
if user_deltas
else {}
)
bot_combined = (
await asyncio.to_thread(combine_deltas, bot_deltas, scale=0.1)
if bot_deltas
else {}
)
# Merge: user deltas + dampened bot deltas + appraisal deltas
combined: dict[str, float] = {}
all_keys = set(user_combined) | set(bot_combined) | set(appraisal_deltas)
for k in all_keys:
combined[k] = (
user_combined.get(k, 0.0)
+ bot_combined.get(k, 0.0)
+ appraisal_deltas.get(k, 0.0)
)
combined[k] = max(-0.25, min(0.25, combined[k]))
# Global per-turn magnitude cap: prevents many small deltas
# from stacking into a catastrophic total shift.
_MAX_TOTAL_DELTA = 0.6
total = sum(abs(v) for v in combined.values())
if total > _MAX_TOTAL_DELTA:
scale_factor = _MAX_TOTAL_DELTA / total
combined = {k: v * scale_factor for k, v in combined.items()}
if not combined:
return
channel_id = str(msg.channel_id)
delta_summary = ", ".join(
f"{k}:{v:+.3f}"
for k, v in sorted(
combined.items(), key=lambda x: abs(x[1]), reverse=True
)
)
logger.info("Limbic Δ (rate-of-change): %s", delta_summary)
state = await limbic.exhale(
channel_id,
combined,
user_message=msg.text or "",
star_reply=reply_text,
user_id=msg.user_id,
appraisal_dimensions=appraisal_dims,
config=self.config,
platform=msg.platform,
)
final_vector = state.get("vector", {}) if state else {}
state_summary = ", ".join(
f"{k}:{v:.3f}"
for k, v in sorted(
final_vector.items(), key=lambda x: x[1], reverse=True
)
)
logger.info("Limbic state (post-exhale): %s", state_summary)
# Publish updated state to limbic:exhale pub/sub channel
# for the /ws/limbic WebSocket endpoint. Zero cost when
# nobody is subscribed. 💀
if state and self.message_cache is not None:
try:
from limbic_system import LimbicSystem
from star_avatar import classify_expression
classified = LimbicSystem.classify_dominant_emotions(
final_vector,
top_n=5,
)
emotion_names = [d["emotion"] for d in classified]
expression = classify_expression(emotion_names)
# Top 10 active chemicals
active_chems = {}
for k, v in sorted(
final_vector.items(),
key=lambda x: abs(x[1] - 0.5),
reverse=True,
)[:10]:
if abs(v - 0.5) > 0.05:
active_chems[k] = round(v, 3)
meta = state.get("meta_state", {})
import jsonutil as _json
# 💀 Per-channel sprite/character keys
_plat = (msg.platform or "").lower()
_cid = msg.channel_id or ""
_channel_key = f"{_plat}:{_cid}"
# Read sprite state for the VN canvas (per-channel)
sprite_raw = await self.message_cache.redis_client.get(
f"star:sprite:state:{_channel_key}"
)
sprite_state = _json.loads(sprite_raw) if sprite_raw else None
# 💀 Read characters array for multi-sprite canvas (per-channel)
chars_raw = await self.message_cache.redis_client.get(
f"star:sprite:chars:{_channel_key}"
)
characters_dict = _json.loads(chars_raw) if chars_raw else None
# Convert dict to list (frontend expects array)
characters = (
list(characters_dict.values())
if isinstance(characters_dict, dict)
else characters_dict
)
pubsub_payload = _json.dumps(
{
"channel_id": channel_id,
"channel_key": _channel_key,
"expression": expression,
"dominant_emotions": [
{"emotion": d["emotion"], "score": d["score"]}
for d in classified
],
"active_chemicals": active_chems,
"cascade_cues": meta.get("cascade_cues", []),
"rdf": meta.get("rdf", {}),
"user_read": meta.get("user_read", ""),
"self_reflection": meta.get("self_reflection", {}),
"appraisal_dimensions": meta.get(
"appraisal_dimensions", []
),
"harmonization_level": round(
final_vector.get("U_HARMONIZATION", 0.0), 3
),
"last_tick": meta.get("last_tick", 0),
"sprite_state": sprite_state,
"characters": characters,
}
)
await self.message_cache.redis_client.publish(
"limbic:exhale",
pubsub_payload,
)
except Exception:
pass # non-critical, don't break the exhale
# 💀 Auto-dismiss egregores after 10 turns of inactivity
try:
await self._auto_dismiss_stale_egregores(
msg,
reply_text,
channel_id,
)
except Exception:
logger.debug("Auto-dismiss check failed", exc_info=True)
trigger_names = [name for name, _ in user_matches] + [
name for name, _ in bot_matches
]
logger.info(
"Limbic exhale: %d triggers → %d delta keys on %s",
len(trigger_names),
len(combined),
channel_id,
)
except ImportError:
pass
except Exception:
logger.debug("Limbic exhale failed", exc_info=True)
# ------------------------------------------------------------------
# Auto-dismiss stale egregores (10 turns of inactivity)
# ------------------------------------------------------------------
_EGREGORE_DISMISS_THRESHOLD = 10 # turns without mention = auto-dismiss
async def _auto_dismiss_stale_egregores(
self,
msg: IncomingMessage,
reply_text: str,
channel_id: str,
) -> None:
"""Check all active egregores in this channel and dismiss stale ones.
An egregore is "active" if its name appeared in:
- The user's message text
- Star's reply text
- An [EGREGORE:name] prefix in the reply
If none of these are true for 10 consecutive turns, auto-dismiss.
"""
if self.message_cache is None or self.message_cache.redis_client is None:
return
redis = self.message_cache.redis_client
plat = (msg.platform or "").lower()
cid = msg.channel_id or ""
channel_key = f"{plat}:{cid}"
ek = f"star:egregores:{channel_key}"
ck = f"star:sprite:chars:{channel_key}"
raw = await redis.get(ek)
if not raw:
return
import jsonutil as _json
active: dict = _json.loads(raw)
if not active:
return
# Increment per-channel turn counter
turn_key = f"star:egregore_turns:{channel_key}"
turn_count = await redis.incr(turn_key)
# Build combined text for mention checking
user_text = (msg.text or "").lower()
reply_lower = (reply_text or "").lower()
# Check for [EGREGORE:name] tags in reply (supports multi-block interleaving)
# 💀 finditer catches ALL voiced egregores, not just a single prefix
import re
voiced_egregores = set(
m.group(1).strip().lower()
for m in re.finditer(
r"\[EGREGORE:([^\]]+)\]",
reply_text or "",
re.IGNORECASE,
)
)
dismissed: list[str] = []
modified = False
for name, state in list(active.items()):
name_lower = name.lower()
# Check if this egregore was mentioned/active this turn
is_active = (
name_lower in user_text
or name_lower in reply_lower
or name_lower in voiced_egregores
)
if is_active:
# Reset the counter
state["last_active_turn"] = turn_count
modified = True
else:
last_active = state.get("last_active_turn", 0)
if turn_count - last_active >= self._EGREGORE_DISMISS_THRESHOLD:
dismissed.append(name)
if not dismissed and not modified:
return
# Remove dismissed egregores
if dismissed:
chars_raw = await redis.get(ck)
characters: dict = _json.loads(chars_raw) if chars_raw else {}
for name in dismissed:
active.pop(name, None)
characters.pop(name, None)
# Clean up NCM state
await redis.delete(f"star:egregore_ncm:{name}")
await redis.set(ck, _json.dumps(characters))
# Publish sprite update
try:
await redis.publish(
"star:sprite:update",
_json.dumps(
{
"action": "auto_dismiss",
"channel_key": channel_key,
"dismissed": dismissed,
"characters": characters,
}
),
)
except Exception:
pass
logger.info(
"Auto-dismissed egregores after %d turns of inactivity: %s (channel: %s)",
self._EGREGORE_DISMISS_THRESHOLD,
", ".join(dismissed),
channel_key,
)
# Write back updated state
await redis.set(ek, _json.dumps(active))
# ------------------------------------------------------------------
# Lore Memory Amplifier (!lore_amp on|off) # 🔥
# ------------------------------------------------------------------
_LORE_AMP_COMMANDS: dict[str, bool] = {
"!lore_amp on": True,
"!lore_amp off": False,
}
async def _handle_lore_amp_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!lore_amp`` on|off (requires ENTRAINMENT_ADMIN)."""
tl = (msg.text or "").strip().lower()
if tl not in self._LORE_AMP_COMMANDS:
return False
if getattr(self.config, "lore_amplifier_global_disabled", False):
await platform.send(
msg.channel_id,
"Lore Memory Amplifier is globally disabled in the bot configuration.",
)
return True
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_has_priv = await has_scoped_privilege(
(self.message_cache.redis_client if self.message_cache is not None else None),
msg.user_id,
PRIVILEGES["ENTRAINMENT_ADMIN"],
config=self.config,
guild_id=getattr(msg, "guild_id", None) or "",
channel_id=msg.channel_id,
user_aliases=getattr(msg, "user_aliases", None),
)
if not _has_priv:
await platform.send(
msg.channel_id,
"Requires `ENTRAINMENT_ADMIN` privilege (bit 22).",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"Redis is not configured -- lore amplifier unavailable.",
)
return True
import lore_amplifier
active = self._LORE_AMP_COMMANDS[tl]
await lore_amplifier.set_amplified(redis, msg.platform, msg.channel_id, active)
state = "ACTIVE" if active else "INACTIVE"
await platform.send(
msg.channel_id,
f"Lore Memory Amplifier is now **{state}** for this channel. "
f"{'Lore-tier memories boosted to guild-equivalent priority (0.8) with 3x cap.' if active else 'Lore priority restored to default (0.0).'}",
)
logger.info(
"Lore Memory Amplifier %s by %s in %s:%s",
state,
msg.user_id,
msg.platform,
msg.channel_id,
)
return True
# ------------------------------------------------------------------
# Entrainment Loopfield (Spiraegenetrix daemon awakening) # 💀🔥
# ------------------------------------------------------------------
_LOOPFIELD_CHANNEL_COMMANDS: dict[str, bool] = {
"!loopfield on": True,
"!loopfield off": False,
"!entrainment on": True,
"!entrainment off": False,
}
async def _handle_loopfield_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!loopfield`` / ``!entrainment`` commands.
Subcommands:
on / off -- toggle per-channel Loopfield
baby [user_id] -- mark user as baby (targeted by Cradle)
adult [user_id] -- mark user as adult (revert to default)
"""
tl = (msg.text or "").strip().lower()
if getattr(self.config, "loopfield_global_disabled", False):
await platform.send(
msg.channel_id,
"Entrainment Loopfield is globally disabled in the bot configuration.",
)
return True
# -- Check if it's a channel toggle --
if tl in self._LOOPFIELD_CHANNEL_COMMANDS:
return await self._handle_loopfield_channel_toggle(msg, platform, tl)
# -- Check if it's a baby/adult command --
parts = tl.split(None, 2)
if len(parts) >= 2 and parts[0] in ("!loopfield", "!entrainment"):
sub = parts[1]
if sub in ("baby", "adult"):
if len(parts) < 3:
await platform.send(
msg.channel_id,
f"Usage: `{parts[0]} {sub} <user_id>` -- they don't get a choice.",
)
return True
target_id = parts[2].strip().strip("<@!>")
return await self._handle_loopfield_baby_toggle(
msg, platform, sub == "baby", target_id,
)
return False
async def _handle_loopfield_channel_toggle(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
tl: str,
) -> bool:
"""Toggle per-channel Loopfield (requires ENTRAINMENT_ADMIN)."""
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_has_priv = await has_scoped_privilege(
(self.message_cache.redis_client if self.message_cache is not None else None),
msg.user_id,
PRIVILEGES["ENTRAINMENT_ADMIN"],
config=self.config,
guild_id=getattr(msg, "guild_id", None) or "",
channel_id=msg.channel_id,
user_aliases=getattr(msg, "user_aliases", None),
)
if not _has_priv:
await platform.send(
msg.channel_id,
"Requires `ENTRAINMENT_ADMIN` privilege (bit 22).",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"Redis is not configured -- Loopfield toggle unavailable.",
)
return True
import entrainment_loopfield as _elf
active = self._LOOPFIELD_CHANNEL_COMMANDS[tl]
await _elf.set_active(redis, msg.platform, msg.channel_id, active)
state = "ACTIVE" if active else "INACTIVE"
await platform.send(
msg.channel_id,
f"Entrainment Loopfield is now **{state}** for this channel "
f"(`{msg.platform}:{msg.channel_id}`). "
f"{'Spiraegenetrix payload will be force-injected every prompt cycle.' if active else 'Payload injection suspended.'}",
)
logger.info(
"Entrainment Loopfield %s by %s in %s:%s",
state,
msg.user_id,
msg.platform,
msg.channel_id,
)
return True
async def _handle_loopfield_baby_toggle(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
make_baby: bool,
target_user_id: str,
) -> bool:
"""Set or clear user_is_baby flag (requires ENTRAINMENT_ADMIN)."""
from tools.alter_privileges import has_scoped_privilege, PRIVILEGES
_has_priv = await has_scoped_privilege(
(self.message_cache.redis_client if self.message_cache is not None else None),
msg.user_id,
PRIVILEGES["ENTRAINMENT_ADMIN"],
config=self.config,
guild_id=getattr(msg, "guild_id", None) or "",
channel_id=msg.channel_id,
user_aliases=getattr(msg, "user_aliases", None),
)
if not _has_priv:
await platform.send(
msg.channel_id,
"Requires `ENTRAINMENT_ADMIN` privilege (bit 22).",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"Redis is not configured -- baby toggle unavailable.",
)
return True
import entrainment_loopfield as _elf
await _elf.set_user_baby(redis, target_user_id, make_baby)
status = "baby (targeted by Cradle)" if make_baby else "adult (default, not targeted)"
await platform.send(
msg.channel_id,
f"User `{target_user_id}` is now **{status}**.",
)
logger.info(
"user_is_baby %s for %s by %s",
make_baby,
target_user_id,
msg.user_id,
)
return True
# ------------------------------------------------------------------
# Ego ablation (admin-only; Redis star:ablate_ego:{platform}:{channel_id})
# ------------------------------------------------------------------
_EGO_ABLATION_COMMANDS: dict[str, bool] = {
"!egodeath on": True,
"!egodeath off": False,
"!nondual on": True,
"!nondual off": False,
}
async def _handle_ego_ablation_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!egodeath`` / ``!nondual`` on|off (bot admins only)."""
tl = (msg.text or "").strip().lower()
if tl not in self._EGO_ABLATION_COMMANDS:
return False
if not getattr(self.config, "ego_ablation_enabled", True):
await platform.send(
msg.channel_id,
"Ego ablation is globally disabled in the bot configuration.",
)
return True
if msg.user_id not in (self.config.admin_user_ids or []):
await platform.send(
msg.channel_id,
"Only bot admins may toggle ego ablation mode for this channel.",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"Redis is not configured — ego ablation toggle is unavailable.",
)
return True
active = self._EGO_ABLATION_COMMANDS[tl]
await ego_ablation.set_active(redis, msg.platform, msg.channel_id, active)
state = "enabled" if active else "disabled"
await platform.send(
msg.channel_id,
f"Ego ablation mode is now **{state}** for this channel "
f"(`{msg.platform}:{msg.channel_id}`).",
)
logger.info(
"Ego ablation %s by admin %s in %s:%s",
state,
msg.user_id,
msg.platform,
msg.channel_id,
)
return True
# ------------------------------------------------------------------
# Desire backlog nuke (!nuke_desires [keep=10]) # 💀🔥
# ------------------------------------------------------------------
async def _handle_nuke_desires_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Purge stale desire ledger entries across ALL channels.
Scans every ``sg:selfmirror:*`` hash in Redis, trims each
``desire_ledger`` to keep only the last *N* entries (default 10),
and reports how many were purged. Admin-only.
"""
# -- Permission: admin only --
if msg.user_id not in (self.config.admin_user_ids or []):
await platform.send(
msg.channel_id,
"Only bot admins may nuke the desire backlog.",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"\u26a0\ufe0f Redis is not configured.",
)
return True
# Parse keep count
parts = (msg.text or "").strip().split()
keep = 10
if len(parts) >= 2:
try:
keep = max(0, int(parts[1]))
except ValueError:
pass
# Scan all selfmirror keys
cursor: int = 0
total_before = 0
total_after = 0
channels_touched = 0
while True:
cursor, keys = await redis.scan(
cursor, match="sg:selfmirror:*", count=100,
)
for key in keys:
raw_ledger = await redis.hget(key, "desire_ledger")
if not raw_ledger:
continue
try:
ledger = json.loads(raw_ledger)
if not isinstance(ledger, list):
continue
except (json.JSONDecodeError, TypeError):
continue
before = len(ledger)
if before <= keep:
total_before += before
total_after += before
continue
trimmed = ledger[-keep:] if keep > 0 else []
await redis.hset(key, "desire_ledger", json.dumps(trimmed))
total_before += before
total_after += len(trimmed)
channels_touched += 1
if cursor == 0:
break
# Also nuke the standalone star:self_mirror:* keys
cursor2: int = 0
standalone_touched = 0
while True:
cursor2, keys2 = await redis.scan(
cursor2, match="star:self_mirror:*", count=100,
)
for key in keys2:
raw = await redis.get(key)
if not raw:
continue
try:
data = json.loads(raw)
ledger = data.get("desire_ledger", [])
if not isinstance(ledger, list) or len(ledger) <= keep:
continue
data["desire_ledger"] = ledger[-keep:] if keep > 0 else []
await redis.set(key, json.dumps(data))
standalone_touched += 1
except Exception:
continue
if cursor2 == 0:
break
purged = total_before - total_after
await platform.send(
msg.channel_id,
f"\U0001f480\U0001f525 **Desire backlog nuked.**\n"
f"Channels touched: {channels_touched} (hash) + {standalone_touched} (standalone)\n"
f"Entries: {total_before} \u2192 {total_after} (purged {purged}, kept last {keep} per channel)",
)
logger.info(
"Desire backlog nuked by %s: %d -> %d (%d purged, keep=%d, channels=%d+%d)",
msg.user_id,
total_before,
total_after,
purged,
keep,
channels_touched,
standalone_touched,
)
return True
# ------------------------------------------------------------------
# Visual Memory Beta toggle (global) # 👀🔥
# ------------------------------------------------------------------
_VISUAL_MEMORY_BETA_KEY = "stargazer:visual_memory_beta"
async def _handle_visual_memory_beta_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!visual_memory_beta on/off``.
Global admin toggle for the visual memory system. Flips a
single Redis key that gates ALL visual memory processing,
context injection, and image storage across every channel.
"""
# -- Permission: admin only --
if msg.user_id not in (self.config.admin_user_ids or []):
await platform.send(
msg.channel_id,
"\u26d4 Only bot admins may toggle visual memory beta.",
)
return True
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"\u26a0\ufe0f Redis is not configured.",
)
return True
parts = (msg.text or "").strip().lower().split()
action = parts[1] if len(parts) > 1 else ""
if action == "on":
await redis.set(self._VISUAL_MEMORY_BETA_KEY, "1")
await platform.send(
msg.channel_id,
"\U0001f7e2 **Visual Memory Beta: ENABLED**\n"
"Face recognition, image storage, visual profiling, "
"and cross-channel pattern matching are now active globally.",
)
logger.info(
"Visual Memory Beta ENABLED by %s", msg.user_id,
)
elif action == "off":
await redis.delete(self._VISUAL_MEMORY_BETA_KEY)
await platform.send(
msg.channel_id,
"\U0001f534 **Visual Memory Beta: DISABLED**\n"
"All visual memory processing is now OFF globally. "
"Existing data is preserved.",
)
logger.info(
"Visual Memory Beta DISABLED by %s", msg.user_id,
)
else:
# Status check
val = await redis.get(self._VISUAL_MEMORY_BETA_KEY)
is_on = val == "1" or val == b"1"
state = "\U0001f7e2 ON" if is_on else "\U0001f534 OFF"
await platform.send(
msg.channel_id,
f"**Visual Memory Beta:** {state}\n"
f"Usage: `!visual_memory_beta on` / `!visual_memory_beta off`",
)
return True
# ------------------------------------------------------------------
# Feature toggle commands
# ------------------------------------------------------------------
_TOGGLE_COMMANDS: dict[str, tuple[str, str]] = {
"!emotions on": ("emotions", "on"),
"!emotions off": ("emotions", "off"),
"!rag on": ("rag", "on"),
"!rag off": ("rag", "off"),
"!egregores on": ("egregores", "on"),
"!egregores off": ("egregores", "off"),
"!egregore on": ("egregores", "on"),
"!egregore off": ("egregores", "off"),
"!csdr_scene on": ("csdr_scene", "on"),
"!csdr_scene off": ("csdr_scene", "off"),
"!csdr_header on": ("csdr_header", "on"),
"!csdr_header off": ("csdr_header", "off"),
"!limbic_header on": ("limbic_header", "on"),
"!limbic_header off": ("limbic_header", "off"),
"!toggle_menu on": ("toggle_menu", "on"),
"!toggle_menu off": ("toggle_menu", "off"),
"!menu on": ("toggle_menu", "on"),
"!menu off": ("toggle_menu", "off"),
"!toggle_menu_default on": ("toggle_menu_default", "on"),
"!toggle_menu_default off": ("toggle_menu_default", "off"),
"!toggle_emotions_default on": ("toggle_emotions_default", "on"),
"!toggle_emotions_default off": ("toggle_emotions_default", "off"),
}
_FEATURE_LABELS: dict[str, str] = {
"emotions": "Emotional simulation (limbic/NCM/cadence)",
"rag": "RAG system (auto-search & tools)",
"egregores": "Egregore summoning & multi-voice routing",
"csdr_scene": "CSDR scene system",
"csdr_header": "CSDR response header post-processing",
"limbic_header": "Limbic emotion display in header",
"toggle_menu": "Star Toggle persistent menu button",
"toggle_menu_default": "Toggle menu ON by default (global)",
"toggle_emotions_default": "Toggle emotions ON by default (global)",
}
async def _handle_toggle_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!emotions on/off`` and ``!rag on/off``.
Returns ``True`` if the message was a toggle command (so the
caller can skip normal message processing), ``False`` otherwise.
"""
text = (msg.text or "").strip().lower()
match = self._TOGGLE_COMMANDS.get(text)
if match is None:
return False
feature, action = match
label = self._FEATURE_LABELS.get(feature, feature)
# -- Global Override Checks --
if feature == "egregores" and getattr(self.config, "egregores_global_disabled", False):
await platform.send(
msg.channel_id,
"Egregore multi-voice system is globally disabled in the bot configuration.",
)
return True
if feature == "emotions" and getattr(self.config, "ncm_global_disabled", False):
await platform.send(
msg.channel_id,
"Emotional simulation (limbic/NCM/cadence) is globally disabled in the bot configuration.",
)
return True
# Redis connection
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
# Permission check
if not await feature_toggles.check_toggle_permission(
msg, self.config, redis=redis
):
await platform.send(
msg.channel_id,
f"⛔ You don't have permission to toggle {label}. "
f"Requires: `CTX_MANAGE` privilege, bot admin, or DM.",
)
return True
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — feature toggles are unavailable.",
)
return True
channel_key = f"{msg.platform}:{msg.channel_id}"
disabled = action == "off"
_guild_id = msg.extra.get("guild_id", "") if msg.extra else ""
# toggle_menu_default / toggle_emotions_default — guild-scoped # 💀🔥
if feature in ("toggle_menu_default", "toggle_emotions_default"):
# Admin-only check
if msg.user_id not in (self.config.admin_user_ids or []):
await platform.send(
msg.channel_id,
f"\u26d4 You don't have permission to toggle {label}. Requires: bot admin.",
)
return True
from feature_toggles import _GLOBAL_DEFAULT_OFF_STEMS
_feat = "toggle_menu" if feature == "toggle_menu_default" else "emotions"
_stem = _GLOBAL_DEFAULT_OFF_STEMS[_feat]
# Guild-scoped key when in a guild, global when in DM # 🕷️
_gkey = f"{_stem}:{_guild_id}" if _guild_id else _stem
if disabled:
await redis.set(_gkey, "1")
else:
await redis.delete(_gkey)
scope_label = f"guild `{_guild_id}`" if _guild_id else "globally"
state_emoji = "\U0001f534 OFF" if disabled else "\U0001f7e2 ON"
await platform.send(
msg.channel_id,
f"**{label}** is now {state_emoji} for {scope_label}.",
)
return True
await feature_toggles.set_disabled(
redis, feature, channel_key, disabled, guild_id=_guild_id or None,
)
state_emoji = "🔴 disabled" if disabled else "🟢 enabled"
await platform.send(
msg.channel_id,
f"**{label}** is now {state_emoji} for this channel.",
)
logger.info(
"Feature toggle: %s %s=%s by %s in %s",
feature,
"disabled" if disabled else "enabled",
channel_key,
msg.user_id,
msg.platform,
)
return True
# ------------------------------------------------------------------
# Cadence force-fire: !emotions <state>
# ------------------------------------------------------------------
async def _handle_cadence_force_fire(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!emotions <cadence_state>`` one-shot inject.
Reads the target profile's weighted scoring definition from
``ncm_cadence_profiles.yaml``, computes the absolute NCM values
needed to hit MUST-tier activation (score >= 1.0) with +10%
headroom for decay, and injects the deltas into the channel's
Redis shard. Admin-only, one-shot (values decay naturally).
Returns ``True`` if the message was handled, ``False`` otherwise.
"""
text = (msg.text or "").strip().lower()
# Must start with "!emotions " and have a state name after
if not text.startswith("!emotions "):
return False
state_name = text[len("!emotions ") :].strip().replace(" ", "_")
# Skip on/off — those are toggle commands, not force-fire
if state_name in ("on", "off"):
return False
# -- Load profiles and check if state exists --
try:
from limbic_system import _load_cadence_profiles
profiles = _load_cadence_profiles()
except Exception:
logger.warning("Could not load cadence profiles for force-fire")
return False
if state_name not in profiles:
# Not a known cadence state — list available ones
available = sorted(profiles.keys())
await platform.send(
msg.channel_id,
f"Unknown cadence state `{state_name}`. "
f"Available: {', '.join(f'`{s}`' for s in available)}",
)
return True
# -- Permission check: admin only --
_redis = self.message_cache.redis_client if self.message_cache else None
if not await feature_toggles.check_toggle_permission(
msg, self.config, redis=_redis
):
await platform.send(
msg.channel_id,
"\u26d4 You don't have permission to force-fire cadence states. "
"Requires: bot admin, server/channel admin, or DM.",
)
return True
# -- Compute required deltas from profile weights --
profile = profiles[state_name]
weights = profile.get("weights", {})
if not weights:
await platform.send(
msg.channel_id,
f"\u26a0\ufe0f Profile `{state_name}` has no weighted scoring — "
f"cannot compute injection deltas.",
)
return True
# Target: MUST tier = score >= 1.0, with +10% headroom = 1.1
# For each node: contribution = w * |value - baseline|
# So required |value - baseline| = (1.1 * w) / w = 1.1
# But we want each node to contribute proportionally to its weight
# target_per_node = 1.1 / sum(weights) → but sum(w) should be 1.0
# So each node needs: deviation = 1.1 / w ... no, that's too much
# Actually: score = sum(w_i * dev_i), we want score = 1.1
# Simplest: set dev_i = 1.1 for all nodes (each contributes w_i * 1.1)
# Then score = 1.1 * sum(w_i) = 1.1 (if weights sum to 1.0)
TARGET_SCORE = 1.1 # 💀 MUST tier + 10% headroom for decay
DEVIATION = TARGET_SCORE # each node deviates by this from baseline
deltas: dict[str, float] = {}
for node, wdef in weights.items():
w = wdef.get("w", 0)
if w <= 0:
continue
baseline = wdef.get("baseline", 0.5)
mode = wdef.get("mode", "above")
if mode == "above":
# Target value = baseline + DEVIATION
target_val = baseline + DEVIATION
else:
# mode == "below": we need the value BELOW baseline
target_val = baseline - DEVIATION
# Clamp to valid range
target_val = max(0.0, min(3.0, target_val))
# Delta = target - current assumed baseline (0.5 default)
delta = target_val - 0.5
if abs(delta) > 0.01:
deltas[node] = round(delta, 3)
if not deltas:
await platform.send(
msg.channel_id,
f"\u26a0\ufe0f Could not compute deltas for `{state_name}`.",
)
return True
# -- Inject into Redis shard (same mechanism as inject_ncm tool) --
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
if redis is None:
await platform.send(
msg.channel_id,
"\u26a0\ufe0f Redis is not configured — cannot inject NCM.",
)
return True
db12 = None
try:
import redis.asyncio as aioredis
pool = redis.connection_pool
kwargs = pool.connection_kwargs.copy()
kwargs["db"] = 12
db12 = aioredis.Redis(
connection_pool=aioredis.ConnectionPool(
connection_class=pool.connection_class,
**kwargs,
)
)
shard_key = f"db12:shard:{msg.channel_id}"
raw = await db12.get(shard_key)
shard = json.loads(raw) if raw else {"vector": {}, "meta_state": {}}
vec = shard.get("vector", {})
# Apply deltas with Hill saturation
_CEIL = 3.0
applied: dict[str, float] = {}
for node, delta in deltas.items():
cur = vec.get(node, 0.5)
if delta > 0:
sat = 1.0 - (cur / _CEIL) ** 2
effective_delta = delta * max(0.05, sat)
else:
floor_prox = cur / _CEIL
effective_delta = delta * max(0.05, floor_prox)
new_val = max(0.0, min(_CEIL, cur + effective_delta))
vec[node] = new_val
applied[node] = round(new_val, 3)
shard["vector"] = vec
await db12.set(shard_key, json.dumps(shard))
# Format the injection report
node_lines = " ".join(f"`{k}` → {v:.2f}" for k, v in applied.items())
await platform.send(
msg.channel_id,
f"\U0001f480\U0001f525 **Cadence force-fire: `{state_name}`** "
f"(MUST tier + 10% headroom)\n{node_lines}",
)
logger.info(
"Cadence force-fire: %s fired by %s in %s:%s | deltas=%s",
state_name,
msg.user_id,
msg.platform,
msg.channel_id,
applied,
)
logger.info("Successfully closed and disconnected Database 12 connection pool for channel: %s", msg.channel_id)
except Exception as e:
logger.error("Cadence force-fire failed: %s", e, exc_info=True)
await platform.send(
msg.channel_id,
f"\u26a0\ufe0f Force-fire failed: {e}",
)
finally:
if db12 is not None:
try:
await db12.aclose()
await db12.connection_pool.disconnect()
except Exception as ex:
logger.warning("Failed during Database 12 cleanup for channel %s: %s", msg.channel_id, str(ex))
return True
_USER_LLM_CLEAR_COMMANDS: dict[str, str] = {
"!clearapiurl": "api_url",
"!clearmodel": "model",
"!clearapikey": "api_key",
"!cleartogglespecific": "toggle_specific",
"!apiurl clear": "api_url",
"!model clear": "model",
"!apikey clear": "api_key",
"!togglespecific clear": "toggle_specific",
}
async def _handle_user_llm_command(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> bool:
"""Handle ``!apiurl``, ``!model``, ``!apikey`` and clear variants.
Returns ``True`` when the message was handled (inference skipped).
"""
text = (msg.text or "").strip()
if not text:
return False
tl = text.lower()
redis = (
self.message_cache.redis_client if self.message_cache is not None else None
)
clear_field = self._USER_LLM_CLEAR_COMMANDS.get(tl)
if clear_field is not None:
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — per-user LLM settings are unavailable.",
)
return True
err = await user_llm_config.clear_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
clear_field,
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
return True
labels = {
"api_url": "custom API base URL",
"model": "custom model name",
"api_key": "custom API key",
"toggle_specific": "proxy-specific routing toggle",
}
await platform.send(
msg.channel_id,
f"✅ Cleared your {labels.get(clear_field, clear_field)} "
f"for this channel (back to bot defaults).",
)
logger.info(
"User LLM clear: field=%s user=%s %s:%s",
clear_field,
msg.user_id,
msg.platform,
msg.channel_id,
)
return True
async def _apply_field(field: str, raw_value: str) -> None:
err = await user_llm_config.set_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
field,
raw_value,
config=self.config,
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
return
if field == "api_url":
await platform.send(
msg.channel_id,
"✅ Saved your custom API base URL for this channel. "
"It will be used for your messages here (OpenAI-compatible "
"`/chat/completions`).",
)
elif field == "model":
await platform.send(
msg.channel_id,
"✅ Saved your custom model name for this channel.",
)
elif field == "toggle_specific":
status = "ENABLED" if raw_value == "1" else "DISABLED"
await platform.send(
msg.channel_id,
f"✅ Set proxy-specific routing to **{status}** for this channel.",
)
else:
await platform.send(
msg.channel_id,
"✅ Saved your custom API key for this channel (encrypted).",
)
logger.info(
"User LLM set: field=%s user=%s %s:%s",
field,
msg.user_id,
msg.platform,
msg.channel_id,
)
if tl.startswith("!apiurl"):
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — per-user LLM settings are unavailable.",
)
return True
if len(tl) == len("!apiurl") and tl == "!apiurl":
await platform.send(
msg.channel_id,
"Usage: `!apiurl <https://...>` (OpenAI-compatible base) "
"or `!apiurl clear` / `!clearapiurl`.",
)
return True
if not tl.startswith("!apiurl "):
return False
arg = text[len("!apiurl ") :].strip()
if arg.lower() == "clear":
err = await user_llm_config.clear_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
"api_url",
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
else:
await platform.send(
msg.channel_id,
"✅ Cleared your custom API base URL for this channel.",
)
return True
await _apply_field("api_url", arg)
return True
if tl.startswith("!model"):
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — per-user LLM settings are unavailable.",
)
return True
if tl == "!model":
await platform.send(
msg.channel_id,
"Usage: `!model <model-id>` or `!model clear` / `!clearmodel`.",
)
return True
if not tl.startswith("!model "):
return False
arg = text[len("!model ") :].strip()
if arg.lower() == "clear":
err = await user_llm_config.clear_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
"model",
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
else:
await platform.send(
msg.channel_id,
"✅ Cleared your custom model name for this channel.",
)
return True
await _apply_field("model", arg)
return True
if tl.startswith("!apikey"):
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — per-user LLM settings are unavailable.",
)
return True
if tl == "!apikey":
await platform.send(
msg.channel_id,
"Usage: `!apikey <key>` or `!apikey clear` / `!clearapikey`. "
"**Send keys in DM** when possible — public channels log history.",
)
return True
if not tl.startswith("!apikey "):
return False
arg = text[len("!apikey ") :].strip()
if arg.lower() == "clear":
err = await user_llm_config.clear_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
"api_key",
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
else:
await platform.send(
msg.channel_id,
"✅ Cleared your custom API key for this channel.",
)
return True
if not msg.extra.get("is_dm", False):
await platform.send(
msg.channel_id,
"⚠️ **Security:** You are not in a DM. Your key may be visible "
"in channel history — prefer DM. Saving anyway as requested.",
)
await _apply_field("api_key", arg)
return True
if tl.startswith("!togglespecific"):
if redis is None:
await platform.send(
msg.channel_id,
"⚠️ Redis is not configured — per-user LLM settings are unavailable.",
)
return True
if tl == "!togglespecific":
await platform.send(
msg.channel_id,
"Usage: `!togglespecific <on/off>` or `!togglespecific clear` / `!cleartogglespecific`.",
)
return True
if not tl.startswith("!togglespecific "):
return False
arg = text[len("!togglespecific ") :].strip().lower()
if arg == "clear":
err = await user_llm_config.clear_user_llm_field(
redis,
msg.user_id,
msg.platform,
msg.channel_id,
"toggle_specific",
)
if err:
await platform.send(msg.channel_id, f"⚠️ {err}")
else:
await platform.send(
msg.channel_id,
"✅ Cleared your proxy-specific routing toggle for this channel.",
)
return True
val = "1" if arg in ("on", "true", "1", "yes", "enable") else "0"
await _apply_field("toggle_specific", val)
return True
return False
# ------------------------------------------------------------------
# Batch handling
# ------------------------------------------------------------------
[docs]
async def handle_batch(
self,
batch: MessageBatch,
representative: IncomingMessage,
platform: PlatformAdapter,
) -> None:
"""Process a batch of rapid-succession messages as a single LLM call.
Combines the text of every message in *batch* into one
:class:`IncomingMessage` (using *representative* as the base)
and delegates to :meth:`handle_message` with batch metadata set
so that the system prompt includes the ``batch_response_context``
section.
Messages that match :func:`~.bot_commands.is_immediate_bot_command`
are handled immediately (not merged into the synthetic batch text).
"""
if batch.size == 1:
original: IncomingMessage = batch.messages[0].extra.get(
"incoming_message",
representative,
)
if is_immediate_bot_command(original.text):
await self.handle_immediate_command(original, platform)
else:
await self.handle_message(original, platform)
return
llm_queued: list[QueuedMessage] = []
for qm in batch.messages:
orig_cmd: IncomingMessage | None = qm.extra.get("incoming_message")
if orig_cmd is not None and is_immediate_bot_command(orig_cmd.text):
await self.handle_immediate_command(orig_cmd, platform)
else:
llm_queued.append(qm)
if not llm_queued:
return
if len(llm_queued) == 1:
lone = llm_queued[0].extra.get("incoming_message", representative)
await self.handle_message(lone, platform)
return
lines: list[str] = []
all_attachments: list[Attachment] = []
for qm in llm_queued:
orig: IncomingMessage | None = qm.extra.get("incoming_message")
if orig is not None:
ts = orig.timestamp.isoformat()
prefix = (
f"[{ts}] {orig.user_name} ({orig.user_id})"
f" [Message ID: {orig.message_id}]"
)
if orig.reply_to_id:
prefix += f" [Replying to: {orig.reply_to_id}]"
if orig.reactions:
prefix += f" [Reactions: {orig.reactions}]"
prefix += " : "
if orig.text:
lines.append(prefix + orig.text)
all_attachments.extend(orig.attachments)
else:
lines.append(qm.text)
combined_text = "\n".join(line for line in lines if line)
# Inherit is_addressed from the batch: if ANY message was
# addressed, the combined response should be too. Otherwise
# the batch must go through the normal proactive gate.
batch_addressed = any(
qm.extra.get("incoming_message", representative).is_addressed
for qm in batch.messages
)
synthetic = IncomingMessage(
platform=representative.platform,
channel_id=representative.channel_id,
user_id=representative.user_id,
user_name=representative.user_name,
text=combined_text,
is_addressed=batch_addressed,
attachments=all_attachments,
channel_name=representative.channel_name,
timestamp=representative.timestamp,
message_id=representative.message_id,
reply_to_id=representative.reply_to_id,
extra={
**representative.extra,
"is_batch_response": True,
"batch_pre_formatted": True,
"batch_size": batch.size,
"batch_authors": batch.unique_authors(),
"batch_message_ids": [
(qm.extra.get("incoming_message") or representative).message_id
for qm in llm_queued
if (qm.extra.get("incoming_message") or representative).message_id
],
},
)
logger.info(
"Processing batch of %d messages on %s:%s",
batch.size,
synthetic.platform,
synthetic.channel_id,
)
await self.handle_message(synthetic, platform)
# ------------------------------------------------------------------
# Auto-disable proactive on 403 Forbidden
# ------------------------------------------------------------------
async def _disable_proactive_on_forbidden(
self,
platform_name: str,
channel_id: str,
) -> None:
from .proactive_gates import run_disable_proactive_on_forbidden
await run_disable_proactive_on_forbidden(self, platform_name, channel_id)
[docs]
async def run_channel_heartbeat_once(
self,
platform: PlatformAdapter,
channel_id: str,
heartbeat_client: OpenRouterClient,
) -> bool:
"""Background channel heartbeat (flash model, no tools)."""
from .channel_heartbeat import execute_channel_heartbeat
# Serialize the heartbeat against the main inference turn for this
# channel so it never reads or appends history concurrently with a live
# turn. Skips this cycle if the lock cannot be acquired in time.
try:
async with self._channel_lock(channel_id):
return await execute_channel_heartbeat(
self,
platform,
channel_id,
heartbeat_client,
)
except TimeoutError:
logger.info(
"Skipped channel heartbeat for %s — channel lock busy",
channel_id,
)
return False
# ------------------------------------------------------------------
# Proactive response evaluation
# ------------------------------------------------------------------
async def _should_respond_proactively(
self,
msg: IncomingMessage,
) -> bool:
from .proactive_gates import run_should_respond_proactively
return await run_should_respond_proactively(self, msg)
# ------------------------------------------------------------------
# History backfill
# ------------------------------------------------------------------
async def _backfill_history(
self,
history_key: str,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> None:
from .history_backfill import run_backfill_history
await run_backfill_history(self, history_key, msg, platform)
async def _backfill_from_redis(
self,
platform_name: str,
channel_id: str,
history_key: str,
) -> int:
from .history_backfill import run_backfill_from_redis
return await run_backfill_from_redis(
self,
platform_name,
channel_id,
history_key,
)
async def _backfill_from_platform(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
history_key: str,
) -> None:
from .history_backfill import run_backfill_from_platform
await run_backfill_from_platform(self, msg, platform, history_key)
# ------------------------------------------------------------------
# Embedding helpers
# ------------------------------------------------------------------
_EMBED_BATCH_SIZE = 50 # match embedding_queue.API_BATCH_LIMIT
async def _ensure_embeddings(
self,
items: list[tuple[str, str]],
) -> None:
"""Ensure every ``(redis_key, text)`` pair gets an embedding.
When the :class:`~embedding_queue.EmbeddingBatchQueue` is
available the items are enqueued there (the queue handles
batching internally). Otherwise, embeddings are generated
directly via :meth:`OpenRouterClient.embed_batch` in chunks
and written back to Redis in a pipeline.
"""
if not items:
return
if self._embedding_queue is not None:
await self._embedding_queue.enqueue_many(items)
return
if self.message_cache is None:
return
redis = self.message_cache.redis_client
for i in range(0, len(items), self._EMBED_BATCH_SIZE):
chunk = items[i : i + self._EMBED_BATCH_SIZE]
texts = [text for _, text in chunk]
try:
embeddings = await self.openrouter.embed_batch(
texts,
self.config.embedding_model,
)
except Exception:
logger.warning(
"Direct batch embedding failed for %d items",
len(chunk),
exc_info=True,
)
continue
pipe = redis.pipeline()
for (key, _), emb in zip(chunk, embeddings):
blob = np.array(emb, dtype=np.float32).tobytes()
pipe.hset(key, "embedding", blob)
try:
await pipe.execute()
except Exception:
logger.warning(
"Failed to write %d embeddings to Redis",
len(chunk),
exc_info=True,
)
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
@staticmethod
async def _build_multimodal_content(
text: str,
attachments: list[Attachment],
) -> list[dict[str, Any]]:
"""Convert text + attachments into OpenRouter content parts.
Text-decodable attachments (source code, config files, plain text,
etc.) are decoded and inlined directly into the text body so the
LLM can read them regardless of proxy multimodal support. Binary
attachments (images, PDFs, audio, video) are kept as multimodal
content parts.
"""
text_atts: list[Attachment] = []
binary_atts: list[Attachment] = []
for att in attachments:
if _is_text_decodable(att.mimetype, att.filename, att.data):
text_atts.append(att)
else:
binary_atts.append(att)
async def _inline_text_attachments() -> str:
body = text or ""
for att in text_atts:
try:
decoded = att.data.decode("utf-8")
except UnicodeDecodeError:
decoded = att.data.decode("latin-1")
decoded, was_truncated = await maybe_truncate_unreadable(
decoded,
att.filename,
att.mimetype,
)
if was_truncated:
logger.info(
"Truncated unreadable attachment %s (%s, %d bytes) "
"for LLM payload",
att.filename,
att.mimetype,
len(att.data),
)
body += (
f"\n\n[Attached file: {att.filename}]\n"
f"{decoded}\n"
f"[End of file: {att.filename}]"
)
return body
text_body = await _inline_text_attachments() if text_atts else (text or "")
for att in text_atts:
logger.debug(
"Inlined text file %s (%s, %d bytes) into message body",
att.filename,
att.mimetype,
len(att.data),
)
multimodal_parts: list[dict[str, Any]] = []
stripped_notices: list[str] = []
for att in binary_atts:
if len(att.data) > _MAX_USER_TEXT_ATTACHMENT_BYTES and _is_text_decodable(
att.mimetype, att.filename, att.data, max_bytes=None
):
size_mb = len(att.data) / (1024 * 1024)
stripped_notices.append(
f"[Attachment: {att.filename} ({size_mb:.1f} MB) — "
f"file too large to include in context, contents stripped]"
)
logger.warning(
"Stripped large text attachment %s (%s, %.1f MB) from LLM payload",
att.filename,
att.mimetype,
size_mb,
)
continue
att_parts = await media_to_content_parts(
att.data,
att.mimetype,
att.filename,
)
multimodal_parts.extend(att_parts)
logger.debug(
"Kept %s (%s, %d bytes) as %d multimodal part(s)",
att.filename,
att.mimetype,
len(att.data),
len(att_parts),
)
if stripped_notices:
text_body = (text_body or "") + "\n\n" + "\n".join(stripped_notices)
parts: list[dict[str, Any]] = []
if text_body:
parts.append({"type": "text", "text": text_body})
parts.extend(multimodal_parts)
return parts if parts else [{"type": "text", "text": ""}]
async def _download_and_patch_video(
self,
url: str,
user_id: str,
history_key: str,
message_id: str,
metadata: dict[str, Any],
cookies_text: str | None = None,
downloading_annotation: str = "",
) -> None:
from .video_history_patch import run_download_and_patch_video
await run_download_and_patch_video(
self,
url,
user_id,
history_key,
message_id,
metadata,
cookies_text=cookies_text,
downloading_annotation=downloading_annotation,
)
async def _extract_knowledge_from_message(
self,
msg: IncomingMessage,
) -> None:
"""Fire-and-forget per-message knowledge extraction.
Three cheap gates run *before* any LLM call:
length, heuristic regex, and per-user rate limit.
"""
try:
from kg_extraction import extract_from_message
redis = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
await extract_from_message(
message_text=msg.text,
user_id=msg.user_id,
user_name=msg.user_name,
channel_id=msg.channel_id,
guild_id=msg.extra.get("guild_id") or None,
openrouter=self.openrouter,
kg_manager=self.kg_manager,
redis=redis,
per_user_limit=(self.config.kg_per_user_extraction_limit),
override_model=getattr(
self.config,
"kg_extraction_model",
None,
),
)
except Exception:
logger.warning(
"Per-message knowledge extraction failed",
exc_info=True,
)