Source code for main

"""Entry point for the multi-platform LLM bot.

Handles tool auto-loading, prompt rendering, platform instantiation,
and the web management GUI.
"""

from __future__ import annotations

# Load .env before any other imports that may read os.environ
try:
    from dotenv import load_dotenv
    load_dotenv()
except ImportError:
    pass

import os

os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")

import asyncio
import json
import logging
import sys
from typing import Any, Callable

import uvicorn

from classifiers.vector_classifier import VectorClassifier, initialize_tool_embeddings_from_file
from config import Config, PlatformConfig
from conversation import ConversationManager
from embedding_queue import EmbeddingBatchQueue
from media_cache import MediaCache
from knowledge_graph import KnowledgeGraphManager
from message_cache import MessageCache
from message_processor import MessageProcessor
from message_queue import MessageQueue, QueuedMessage, MessageBatch, QueueItem
from openrouter_client import OpenRouterClient
from platforms.base import IncomingMessage, PlatformAdapter
from prompt_renderer import PromptRenderer
from rag_system.auto_search import RAGAutoSearchManager
from task_manager import TaskManager
from web_search_context import WebSearchContextManager
from threadweave import ThreadweaveManager
from tool_loader import load_tools
from tools import ToolRegistry
from background_tasks import build_scheduler, BackgroundScheduler, startup_channel_backfill
from init_redis_indexes import ensure_indexes
from status_manager import StatusManager
from tools.scheduled_prompt import set_bot_runner as set_scheduled_prompt_runner
from web import app as web_app, set_bot_runner, set_config

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logging.getLogger("nio").setLevel(logging.WARNING)

from log_redaction import ApiKeyRedactionFilter
logging.getLogger().addFilter(ApiKeyRedactionFilter())

logger = logging.getLogger(__name__)


# ------------------------------------------------------------------
# Platform factory
# ------------------------------------------------------------------

[docs] def create_platform( pcfg: PlatformConfig, message_handler: Callable[..., Any], media_cache: MediaCache | None = None, message_update_handler: Callable[..., Any] | None = None, message_delete_handler: Callable[..., Any] | None = None, config: Config | None = None, reaction_update_handler: Callable[..., Any] | None = None, ) -> PlatformAdapter: """Instantiate the correct :class:`PlatformAdapter` for *pcfg*.""" if pcfg.type == "matrix": from platforms.matrix import MatrixPlatform return MatrixPlatform( message_handler=message_handler, homeserver=pcfg.get("homeserver", "https://matrix.org"), user_id=pcfg.get("user_id", ""), password=pcfg.get("password", ""), store_path=pcfg.get("store_path", "nio_store"), credentials_file=pcfg.get( "credentials_file", "matrix_credentials.json", ), media_cache=media_cache, config=config, ) if pcfg.type == "discord": from platforms.discord import DiscordPlatform return DiscordPlatform( message_handler=message_handler, token=pcfg.get("token", ""), config=config, media_cache=media_cache, message_update_handler=message_update_handler, message_delete_handler=message_delete_handler, reaction_update_handler=reaction_update_handler, ) if pcfg.type == "discord-self": from platforms.discord_self import DiscordSelfPlatform return DiscordSelfPlatform( message_handler=message_handler, token=pcfg.get("token", ""), prefix=pcfg.get("prefix", "!"), config=config, media_cache=media_cache, message_update_handler=message_update_handler, message_delete_handler=message_delete_handler, reaction_update_handler=reaction_update_handler, ) if pcfg.type == "webchat": from platforms.webchat import WebChatPlatform return WebChatPlatform( message_handler=message_handler, ) raise ValueError(f"Unknown platform type: {pcfg.type!r}")
# ------------------------------------------------------------------ # BotRunner – manages the multi-platform lifecycle # ------------------------------------------------------------------
[docs] class BotRunner: """Encapsulates bot state so the web GUI can start / stop individual platforms on demand. """
[docs] def __init__( self, cfg: Config, tool_registry: ToolRegistry, ) -> None: """Initialize the instance. Args: cfg (Config): Bot configuration object. tool_registry (ToolRegistry): The tool registry value. """ self.cfg = cfg self.tool_registry = tool_registry # Build media cache (shared across all platforms) self.media_cache = MediaCache( cache_dir=cfg.media_cache_dir, max_size_mb=cfg.media_cache_max_mb, ) # Build shared core components tool_info = [ {"name": td.name, "description": td.description} for td in self.tool_registry._tools.values() ] renderer = PromptRenderer( cfg.system_prompt_file, default_extras={"tools": tool_info}, ) self.conversation_mgr = ConversationManager( prompt_renderer=renderer, max_history=cfg.max_history, ) # Apply tool permission whitelist from config if cfg.tool_permissions: self.tool_registry.set_permissions(cfg.tool_permissions) self.openrouter = OpenRouterClient( api_key=cfg.api_key, model=cfg.model, temperature=cfg.temperature, max_tokens=cfg.max_tokens, tool_registry=self.tool_registry, base_url=cfg.llm_base_url, gemini_api_key=cfg.gemini_api_key, ) # Optional Redis message cache self.message_cache: MessageCache | None = None self.kg_manager: KnowledgeGraphManager | None = None self._auto_search: RAGAutoSearchManager | None = None self._web_search: WebSearchContextManager | None = None if cfg.redis_url: self.message_cache = MessageCache( redis_url=cfg.redis_url, openrouter_client=self.openrouter, embedding_model=cfg.embedding_model, ssl_kwargs=cfg.redis_ssl_kwargs(), ) logger.info( "Redis message cache enabled (model: %s)", cfg.embedding_model, ) from gemini_embed_pool import init_quota_tracking init_quota_tracking(self.message_cache.redis_client) # Knowledge graph manager shares the same Redis connection self.kg_manager = KnowledgeGraphManager( redis_client=self.message_cache.redis_client, openrouter=self.openrouter, embedding_model=cfg.embedding_model, admin_user_ids=set(cfg.admin_user_ids) if cfg.admin_user_ids else None, ) logger.info("Knowledge graph manager enabled") # RAG auto-search manager self._auto_search = RAGAutoSearchManager( self.message_cache.redis_client, ) logger.info("RAG auto-search manager enabled") # Web search context manager (auto-injects Brave results) self._web_search = WebSearchContextManager( self.message_cache.redis_client, default_api_key=cfg.api_key, config=cfg, ) logger.info("Web search context manager enabled") # Threadweave persistent knowledge system (requires Redis) self.threadweave: ThreadweaveManager | None = None if cfg.redis_url: self.threadweave = ThreadweaveManager( redis_client=self.message_cache.redis_client, openrouter=self.openrouter, embedding_model=cfg.embedding_model, admin_user_ids=( set(cfg.admin_user_ids) if cfg.admin_user_ids else None ), dna_vault_path=cfg.dna_vault_path, ) logger.info("Threadweave manager enabled (vault: %s)", cfg.dna_vault_path) # Vector classifier for tool selection (requires Redis) self._classifier: VectorClassifier | None = None if self.message_cache is not None: self._classifier = VectorClassifier( redis_client=self.message_cache.redis_client, api_key=cfg.openrouter_api_key, ) logger.info("Vector classifier enabled") # Task manager for fire-and-forget tool execution redis_client = ( self.message_cache.redis_client if self.message_cache is not None else None ) self.task_manager = TaskManager( timeout=10.0, redis=redis_client, ) self.tool_registry.task_manager = self.task_manager logger.info("Task manager enabled (timeout=10.0s)") # Deferred embedding batch queue (requires Redis) self.embedding_queue: EmbeddingBatchQueue | None = None if self.message_cache is not None: self.embedding_queue = EmbeddingBatchQueue( openrouter=self.openrouter, redis=self.message_cache.redis_client, model=cfg.embedding_model, flush_interval=cfg.embedding_flush_interval, max_batch_size=cfg.embedding_batch_size, ) logger.info( "Embedding batch queue enabled (flush=%.1fs, batch=%d)", cfg.embedding_flush_interval, cfg.embedding_batch_size, ) # Status manager (Discord adapter set after adapters are created) self.status_manager = StatusManager(openrouter=self.openrouter) self.processor = MessageProcessor( config=cfg, conversation_manager=self.conversation_mgr, openrouter=self.openrouter, message_cache=self.message_cache, kg_manager=self.kg_manager, auto_search=self._auto_search, task_manager=self.task_manager, classifier=self._classifier, threadweave=self.threadweave, embedding_queue=self.embedding_queue, status_manager=self.status_manager, web_search=self._web_search, ) # Per-channel message queue with temporal batching self.message_queue = MessageQueue( default_batch_window=cfg.batch_window, max_batch_size=cfg.max_batch_size, redis=redis_client, ) logger.info( "Message queue enabled (window=%.1fs, max_batch=%d)", cfg.batch_window, cfg.max_batch_size, ) # Background task scheduler self.scheduler: BackgroundScheduler = build_scheduler( redis=redis_client, kg_manager=self.kg_manager, openrouter=self.openrouter, ) # Map channel_id -> (platform adapter) for routing replies from the queue self._channel_adapters: dict[str, PlatformAdapter] = {} # Instantiate platform adapters for all enabled platforms self.adapters: list[PlatformAdapter] = [] # Store adapters list reference on the processor so cross-platform # tools can access all adapters via ToolContext.all_adapters. # This is the same list object — it will reflect adapters added later. self.processor._all_adapters = self.adapters # Also wire into the prompt context builder so the system prompt # can expose the bot's identity on every active platform. self.processor._ctx_builder.all_adapters = self.adapters for pcfg in cfg.platforms: if not pcfg.enabled: logger.info("Platform %s is disabled – skipping", pcfg.type) continue try: adapter = create_platform( pcfg, self._enqueue_message, self.media_cache, message_update_handler=self.processor.handle_message_update, message_delete_handler=self.processor.handle_message_delete, config=cfg, reaction_update_handler=self.processor.handle_reaction_update, ) self.adapters.append(adapter) logger.info("Created adapter for platform: %s", pcfg.type) except Exception: logger.exception( "Failed to create adapter for platform %s", pcfg.type, ) # Wire the cancel callback so slash commands can stop responses. for adapter in self.adapters: adapter._cancel_callback = self._cancel_response # Auto-inject the webchat adapter (always available when web GUI runs) if not self.get_adapter("webchat"): try: from platforms.webchat import WebChatPlatform webchat = WebChatPlatform(message_handler=self._enqueue_message) webchat._cancel_callback = self._cancel_response self.adapters.append(webchat) logger.info("Auto-injected webchat platform adapter") except Exception: logger.warning("Could not inject webchat adapter", exc_info=True) # Wire the Discord adapter into the StatusManager for presence updates discord_adapter = self.get_adapter("discord") if discord_adapter is not None: self.status_manager._discord = discord_adapter # Legacy compat: expose credentials for the Matrix web login flow self._matrix_adapter: Any = None for a in self.adapters: if a.name == "matrix": self._matrix_adapter = a break
@property def is_running(self) -> bool: """Check whether is running. Returns: bool: True on success, False otherwise. """ return any(a.is_running for a in self.adapters) # Expose the Matrix client for the legacy web GUI status endpoint @property def client(self) -> Any: """Client. Returns: Any: The result. """ if self._matrix_adapter is not None: return self._matrix_adapter.client return None # Legacy compat for web.py login @property def credentials(self) -> dict | None: """Credentials. """ if self._matrix_adapter is not None: return self._matrix_adapter.credentials return None @credentials.setter def credentials(self, value: dict | None) -> None: """Credentials. Args: value (dict | None): Value to set. """ if self._matrix_adapter is not None: self._matrix_adapter.credentials = value
[docs] async def start(self) -> None: """Start all platform adapters concurrently.""" # Ensure RediSearch vector indexes exist before processing if self.message_cache is not None: try: await ensure_indexes( self.message_cache.redis_client, ) logger.info("RediSearch indexes verified") except Exception: logger.warning( "Could not create RediSearch indexes " "(is the RediSearch module loaded?)", exc_info=True, ) # Ensure FalkorDB knowledge graph indexes if self.kg_manager is not None: try: await self.kg_manager.ensure_indexes() logger.info("Knowledge graph indexes verified") except Exception: logger.warning( "Could not create knowledge graph indexes", exc_info=True, ) # Initialize tool embeddings in Redis (no-op if already present) if self._classifier is not None: import os index_file = os.path.join( os.path.dirname(__file__), "classifiers", "tool_index_data.json", ) if os.path.exists(index_file): try: await initialize_tool_embeddings_from_file( index_file_path=index_file, redis_client=self.message_cache.redis_client, api_key=self.cfg.openrouter_api_key, ) # Pre-warm classifier's in-memory cache from Redis await self._classifier._load_tool_embeddings() except Exception: logger.warning( "Could not initialize tool embeddings", exc_info=True, ) else: logger.info( "No tool_index_data.json found; run " "'python -m classifiers.build_tool_index' to generate it", ) if self.embedding_queue is not None: await self.embedding_queue.start() # Load media cache index from disk (non-blocking) await self.media_cache.ensure_loaded() if not self.adapters: logger.warning("No platform adapters configured") return results = await asyncio.gather( *(a.start() for a in self.adapters), return_exceptions=True, ) for adapter, result in zip(self.adapters, results): if isinstance(result, Exception): logger.error( "Failed to start %s: %s", adapter.name, result, ) # Load user-donated embed keys from Redis into the live pool if self.message_cache is not None: from gemini_embed_pool import reload_pool await reload_pool() # Backfill recent channel history and enqueue missing embeddings if self.message_cache is not None and self.adapters: try: await startup_channel_backfill( redis=self.message_cache.redis_client, message_cache=self.message_cache, embedding_queue=self.embedding_queue, adapters=self.adapters, ) except Exception: logger.warning( "Startup channel backfill failed", exc_info=True, ) await self.scheduler.start() self.status_manager.start() # Re-process any responses that were interrupted by the last shutdown try: await self.processor.recover_pending_responses( self._enqueue_message, self.adapters, ) except Exception: logger.warning( "Pending response recovery failed", exc_info=True, )
[docs] async def start_platform(self, platform_name: str) -> None: """Start a specific platform adapter by name.""" for adapter in self.adapters: if adapter.name == platform_name: await adapter.start() return raise ValueError(f"No adapter found for platform: {platform_name}")
[docs] async def stop(self) -> None: """Stop all platform adapters and clean up.""" results = await asyncio.gather( *(a.stop() for a in self.adapters), return_exceptions=True, ) for adapter, result in zip(self.adapters, results): if isinstance(result, Exception): logger.error( "Failed to stop %s: %s", adapter.name, result, ) await self.scheduler.stop() self.status_manager.stop() if self.embedding_queue is not None: await self.embedding_queue.stop() if self._classifier is not None: await self._classifier.close() if self.message_cache is not None: await self.message_cache.close() await self.openrouter.close() logger.info("All platforms stopped")
[docs] async def stop_platform(self, platform_name: str) -> None: """Stop a specific platform adapter by name.""" for adapter in self.adapters: if adapter.name == platform_name: await adapter.stop() return raise ValueError(f"No adapter found for platform: {platform_name}")
[docs] def get_adapter(self, platform_name: str) -> PlatformAdapter | None: """Return the adapter for *platform_name*, or ``None``.""" for adapter in self.adapters: if adapter.name == platform_name: return adapter return None
# ------------------------------------------------------------------ # Webhook handling # ------------------------------------------------------------------
[docs] async def handle_webhook( self, platform_name: str, channel_id: str, event_data: dict[str, Any], ) -> None: """Inject an external webhook event into the message pipeline. Looks up the named platform adapter, builds an :class:`IncomingMessage` from *event_data*, and enqueues it for processing through the normal message queue. Raises :class:`ValueError` if the adapter is not found or not running. """ adapter = self.get_adapter(platform_name) if adapter is None: raise ValueError(f"No adapter found for platform: {platform_name!r}") if not adapter.is_running: raise ValueError(f"Platform {platform_name!r} is not running") webhook_json = json.dumps(event_data, indent=2) is_scheduled = event_data.get("type") == "scheduled_prompt_execution" if is_scheduled: prompt_id = event_data.get("prompt_id") prompt_id_info = f" (ID: {prompt_id})" if prompt_id else "" cancellation_note = ( f"\n\nNOTE: If this scheduled prompt is causing problems or " f"is no longer needed, you can cancel it using the " f"cancel_scheduled_prompt tool with ID: {prompt_id}" if prompt_id else "" ) text = ( f"A scheduled prompt has been triggered for execution" f"{prompt_id_info}. Here is the event data:\n\n" f"```json\n{webhook_json}\n```\n\n" f"IMPORTANT: This is a SCHEDULED PROMPT EXECUTION. Do NOT " f"use the schedule_prompt tool in response to this - that " f"would create infinite loops. Simply respond to the prompt " f"content as if a user had asked it directly." f"{cancellation_note}\n\n" f"Analyze this scheduled prompt in the context of the recent " f"conversation in this channel and respond appropriately." ) else: text = ( f"An external event has occurred via webhook. " f"Here is the event data:\n\n" f"```json\n{webhook_json}\n```\n\n" f"Analyze this event in the context of the recent " f"conversation in this channel. If appropriate, announce " f"the event to the channel and take any necessary actions " f"based on the event type and data provided." ) msg = IncomingMessage( platform=platform_name, channel_id=channel_id, user_id="webhook", user_name="Webhook Event", text=text, is_addressed=True, ) logger.info( "Webhook event enqueued for %s:%s (%d chars, scheduled=%s)", platform_name, channel_id, len(text), is_scheduled, ) await self._enqueue_message(msg, adapter)
# ------------------------------------------------------------------ # Message queue integration # ------------------------------------------------------------------ async def _enqueue_message( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> None: """Callback given to platform adapters instead of the processor. Converts the incoming message into a :class:`QueuedMessage`, remembers which adapter to use for replies, and starts the per-channel processor loop if it isn't already running. """ # Intercept !stop before it enters the queue so it can cancel # the currently-running response without waiting in line. if msg.text and msg.text.strip().lower() == "!stop": result = await self._cancel_response( msg.platform, msg.channel_id, msg.user_id, msg.extra.get("is_server_admin", False), ) await platform.send(msg.channel_id, result) return channel_key = f"{msg.platform}:{msg.channel_id}" self._channel_adapters[channel_key] = platform queued = QueuedMessage( platform=msg.platform, channel_id=channel_key, user_id=msg.user_id, user_name=msg.user_name, text=msg.text, queued_at=msg.timestamp, extra={"incoming_message": msg}, raw=msg, ) await self.message_queue.enqueue(queued) await self.message_queue.start_processing( channel_key, self._process_queue_item, ) async def _cancel_response( self, platform_name: str, channel_id: str, user_id: str, is_server_admin: bool = False, ) -> str: """Cancel the in-progress response for a channel. Returns a human-readable status string. """ channel_key = f"{platform_name}:{channel_id}" if not self.message_queue.is_channel_processing(channel_key): return "No response is currently in progress in this channel." # -- Permission check ------------------------------------------ allowed = False if str(user_id) in (self.cfg.admin_user_ids or []): allowed = True if is_server_admin: allowed = True if not allowed and self.message_cache is not None: try: pending_key = f"pending_response:{platform_name}:{channel_id}" initiator = await self.message_cache.redis_client.hget( pending_key, "user_id", ) if initiator is not None: initiator_str = ( initiator.decode() if isinstance(initiator, bytes) else str(initiator) ) if initiator_str == str(user_id): allowed = True except Exception: logger.debug( "Failed to look up pending response initiator", exc_info=True, ) if not allowed: return ( "You don't have permission to stop this response. " "Only the person who triggered it, server admins, " "or bot admins can stop it." ) cancelled = self.message_queue.cancel_current(channel_key) if cancelled: return "Response stopped." return "No response is currently in progress in this channel." async def _process_queue_item(self, item: QueueItem) -> None: """Queue callback -- dispatches single messages or batches.""" if isinstance(item, MessageBatch): first = item.messages[0] channel_key = first.channel_id platform = self._channel_adapters.get(channel_key) if platform is None: logger.error("No adapter for channel %s", channel_key) return original: IncomingMessage = first.extra["incoming_message"] await self.processor.handle_batch(item, original, platform) else: original = item.extra["incoming_message"] channel_key = item.channel_id platform = self._channel_adapters.get(channel_key) if platform is None: logger.error("No adapter for channel %s", channel_key) return await self.processor.handle_message(original, platform)
# ------------------------------------------------------------------ # Main # ------------------------------------------------------------------
[docs] async def main() -> None: """Main. """ cfg = await asyncio.to_thread(Config.load) # Validate required fields if not cfg.api_key: logger.error( "api_key is required in config.yaml " "or API_KEY / OPENROUTER_API_KEY env var", ) sys.exit(1) if not cfg.platforms: logger.warning( "No platforms configured. Add a 'platforms' section to " "config.yaml or set legacy Matrix fields.", ) # Load tools from the tools directory tool_registry = ToolRegistry() await asyncio.to_thread(load_tools, cfg.tools_dir, tool_registry) # Initialize OAuth manager from oauth_manager import init_oauth_manager init_oauth_manager( encryption_key=cfg.oauth_encryption_key, base_url=cfg.oauth_base_url, providers_config=cfg.oauth_providers, ) # Create the bot runner and expose it to the web app runner = BotRunner(cfg, tool_registry) set_bot_runner(runner) set_scheduled_prompt_runner(runner) set_config(cfg) # Auto-start platforms that have credentials / tokens if runner.adapters: logger.info("Auto-starting %d platform(s) …", len(runner.adapters)) try: await runner.start() except Exception: logger.exception( "Auto-start failed; use the web GUI to manage platforms", ) # Start the web server logger.info( "Starting web GUI on http://%s:%d", cfg.web_host, cfg.web_port, ) server_config = uvicorn.Config( web_app, host=cfg.web_host, port=cfg.web_port, log_level="info", loop="none", ) server = uvicorn.Server(server_config) try: await server.serve() except (KeyboardInterrupt, asyncio.CancelledError): logger.info("Shutting down …") finally: await runner.stop()
if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: pass