"""Entry point for the multi-platform LLM bot.
Handles tool auto-loading, prompt rendering, platform instantiation,
and the web management GUI.
"""
from __future__ import annotations
# Load .env before any other imports that may read os.environ
try:
from dotenv import load_dotenv
load_dotenv()
except ImportError:
pass
import os
os.environ.setdefault("ANONYMIZED_TELEMETRY", "False")
import asyncio
import json
import logging
import sys
from typing import Any, Callable
import uvicorn
from classifiers.vector_classifier import VectorClassifier, initialize_tool_embeddings_from_file
from config import Config, PlatformConfig
from conversation import ConversationManager
from embedding_queue import EmbeddingBatchQueue
from media_cache import MediaCache
from knowledge_graph import KnowledgeGraphManager
from message_cache import MessageCache
from message_processor import MessageProcessor
from message_queue import MessageQueue, QueuedMessage, MessageBatch, QueueItem
from openrouter_client import OpenRouterClient
from platforms.base import IncomingMessage, PlatformAdapter
from prompt_renderer import PromptRenderer
from rag_system.auto_search import RAGAutoSearchManager
from task_manager import TaskManager
from web_search_context import WebSearchContextManager
from threadweave import ThreadweaveManager
from tool_loader import load_tools
from tools import ToolRegistry
from background_tasks import build_scheduler, BackgroundScheduler, startup_channel_backfill
from init_redis_indexes import ensure_indexes
from status_manager import StatusManager
from tools.scheduled_prompt import set_bot_runner as set_scheduled_prompt_runner
from web import app as web_app, set_bot_runner, set_config
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logging.getLogger("nio").setLevel(logging.WARNING)
from log_redaction import ApiKeyRedactionFilter
logging.getLogger().addFilter(ApiKeyRedactionFilter())
logger = logging.getLogger(__name__)
# ------------------------------------------------------------------
# Platform factory
# ------------------------------------------------------------------
# ------------------------------------------------------------------
# BotRunner – manages the multi-platform lifecycle
# ------------------------------------------------------------------
[docs]
class BotRunner:
"""Encapsulates bot state so the web GUI can start / stop
individual platforms on demand.
"""
[docs]
def __init__(
self,
cfg: Config,
tool_registry: ToolRegistry,
) -> None:
"""Initialize the instance.
Args:
cfg (Config): Bot configuration object.
tool_registry (ToolRegistry): The tool registry value.
"""
self.cfg = cfg
self.tool_registry = tool_registry
# Build media cache (shared across all platforms)
self.media_cache = MediaCache(
cache_dir=cfg.media_cache_dir,
max_size_mb=cfg.media_cache_max_mb,
)
# Build shared core components
tool_info = [
{"name": td.name, "description": td.description}
for td in self.tool_registry._tools.values()
]
renderer = PromptRenderer(
cfg.system_prompt_file,
default_extras={"tools": tool_info},
)
self.conversation_mgr = ConversationManager(
prompt_renderer=renderer,
max_history=cfg.max_history,
)
# Apply tool permission whitelist from config
if cfg.tool_permissions:
self.tool_registry.set_permissions(cfg.tool_permissions)
self.openrouter = OpenRouterClient(
api_key=cfg.api_key,
model=cfg.model,
temperature=cfg.temperature,
max_tokens=cfg.max_tokens,
tool_registry=self.tool_registry,
base_url=cfg.llm_base_url,
gemini_api_key=cfg.gemini_api_key,
)
# Optional Redis message cache
self.message_cache: MessageCache | None = None
self.kg_manager: KnowledgeGraphManager | None = None
self._auto_search: RAGAutoSearchManager | None = None
self._web_search: WebSearchContextManager | None = None
if cfg.redis_url:
self.message_cache = MessageCache(
redis_url=cfg.redis_url,
openrouter_client=self.openrouter,
embedding_model=cfg.embedding_model,
ssl_kwargs=cfg.redis_ssl_kwargs(),
)
logger.info(
"Redis message cache enabled (model: %s)",
cfg.embedding_model,
)
from gemini_embed_pool import init_quota_tracking
init_quota_tracking(self.message_cache.redis_client)
# Knowledge graph manager shares the same Redis connection
self.kg_manager = KnowledgeGraphManager(
redis_client=self.message_cache.redis_client,
openrouter=self.openrouter,
embedding_model=cfg.embedding_model,
admin_user_ids=set(cfg.admin_user_ids) if cfg.admin_user_ids else None,
)
logger.info("Knowledge graph manager enabled")
# RAG auto-search manager
self._auto_search = RAGAutoSearchManager(
self.message_cache.redis_client,
)
logger.info("RAG auto-search manager enabled")
# Web search context manager (auto-injects Brave results)
self._web_search = WebSearchContextManager(
self.message_cache.redis_client,
default_api_key=cfg.api_key,
config=cfg,
)
logger.info("Web search context manager enabled")
# Threadweave persistent knowledge system (requires Redis)
self.threadweave: ThreadweaveManager | None = None
if cfg.redis_url:
self.threadweave = ThreadweaveManager(
redis_client=self.message_cache.redis_client,
openrouter=self.openrouter,
embedding_model=cfg.embedding_model,
admin_user_ids=(
set(cfg.admin_user_ids) if cfg.admin_user_ids else None
),
dna_vault_path=cfg.dna_vault_path,
)
logger.info("Threadweave manager enabled (vault: %s)", cfg.dna_vault_path)
# Vector classifier for tool selection (requires Redis)
self._classifier: VectorClassifier | None = None
if self.message_cache is not None:
self._classifier = VectorClassifier(
redis_client=self.message_cache.redis_client,
api_key=cfg.openrouter_api_key,
)
logger.info("Vector classifier enabled")
# Task manager for fire-and-forget tool execution
redis_client = (
self.message_cache.redis_client
if self.message_cache is not None
else None
)
self.task_manager = TaskManager(
timeout=10.0,
redis=redis_client,
)
self.tool_registry.task_manager = self.task_manager
logger.info("Task manager enabled (timeout=10.0s)")
# Deferred embedding batch queue (requires Redis)
self.embedding_queue: EmbeddingBatchQueue | None = None
if self.message_cache is not None:
self.embedding_queue = EmbeddingBatchQueue(
openrouter=self.openrouter,
redis=self.message_cache.redis_client,
model=cfg.embedding_model,
flush_interval=cfg.embedding_flush_interval,
max_batch_size=cfg.embedding_batch_size,
)
logger.info(
"Embedding batch queue enabled (flush=%.1fs, batch=%d)",
cfg.embedding_flush_interval, cfg.embedding_batch_size,
)
# Status manager (Discord adapter set after adapters are created)
self.status_manager = StatusManager(openrouter=self.openrouter)
self.processor = MessageProcessor(
config=cfg,
conversation_manager=self.conversation_mgr,
openrouter=self.openrouter,
message_cache=self.message_cache,
kg_manager=self.kg_manager,
auto_search=self._auto_search,
task_manager=self.task_manager,
classifier=self._classifier,
threadweave=self.threadweave,
embedding_queue=self.embedding_queue,
status_manager=self.status_manager,
web_search=self._web_search,
)
# Per-channel message queue with temporal batching
self.message_queue = MessageQueue(
default_batch_window=cfg.batch_window,
max_batch_size=cfg.max_batch_size,
redis=redis_client,
)
logger.info(
"Message queue enabled (window=%.1fs, max_batch=%d)",
cfg.batch_window, cfg.max_batch_size,
)
# Background task scheduler
self.scheduler: BackgroundScheduler = build_scheduler(
redis=redis_client,
kg_manager=self.kg_manager,
openrouter=self.openrouter,
)
# Map channel_id -> (platform adapter) for routing replies from the queue
self._channel_adapters: dict[str, PlatformAdapter] = {}
# Instantiate platform adapters for all enabled platforms
self.adapters: list[PlatformAdapter] = []
# Store adapters list reference on the processor so cross-platform
# tools can access all adapters via ToolContext.all_adapters.
# This is the same list object — it will reflect adapters added later.
self.processor._all_adapters = self.adapters
# Also wire into the prompt context builder so the system prompt
# can expose the bot's identity on every active platform.
self.processor._ctx_builder.all_adapters = self.adapters
for pcfg in cfg.platforms:
if not pcfg.enabled:
logger.info("Platform %s is disabled – skipping", pcfg.type)
continue
try:
adapter = create_platform(
pcfg, self._enqueue_message, self.media_cache,
message_update_handler=self.processor.handle_message_update,
message_delete_handler=self.processor.handle_message_delete,
config=cfg,
reaction_update_handler=self.processor.handle_reaction_update,
)
self.adapters.append(adapter)
logger.info("Created adapter for platform: %s", pcfg.type)
except Exception:
logger.exception(
"Failed to create adapter for platform %s", pcfg.type,
)
# Wire the cancel callback so slash commands can stop responses.
for adapter in self.adapters:
adapter._cancel_callback = self._cancel_response
# Auto-inject the webchat adapter (always available when web GUI runs)
if not self.get_adapter("webchat"):
try:
from platforms.webchat import WebChatPlatform
webchat = WebChatPlatform(message_handler=self._enqueue_message)
webchat._cancel_callback = self._cancel_response
self.adapters.append(webchat)
logger.info("Auto-injected webchat platform adapter")
except Exception:
logger.warning("Could not inject webchat adapter", exc_info=True)
# Wire the Discord adapter into the StatusManager for presence updates
discord_adapter = self.get_adapter("discord")
if discord_adapter is not None:
self.status_manager._discord = discord_adapter
# Legacy compat: expose credentials for the Matrix web login flow
self._matrix_adapter: Any = None
for a in self.adapters:
if a.name == "matrix":
self._matrix_adapter = a
break
@property
def is_running(self) -> bool:
"""Check whether is running.
Returns:
bool: True on success, False otherwise.
"""
return any(a.is_running for a in self.adapters)
# Expose the Matrix client for the legacy web GUI status endpoint
@property
def client(self) -> Any:
"""Client.
Returns:
Any: The result.
"""
if self._matrix_adapter is not None:
return self._matrix_adapter.client
return None
# Legacy compat for web.py login
@property
def credentials(self) -> dict | None:
"""Credentials.
"""
if self._matrix_adapter is not None:
return self._matrix_adapter.credentials
return None
@credentials.setter
def credentials(self, value: dict | None) -> None:
"""Credentials.
Args:
value (dict | None): Value to set.
"""
if self._matrix_adapter is not None:
self._matrix_adapter.credentials = value
[docs]
async def start(self) -> None:
"""Start all platform adapters concurrently."""
# Ensure RediSearch vector indexes exist before processing
if self.message_cache is not None:
try:
await ensure_indexes(
self.message_cache.redis_client,
)
logger.info("RediSearch indexes verified")
except Exception:
logger.warning(
"Could not create RediSearch indexes "
"(is the RediSearch module loaded?)",
exc_info=True,
)
# Ensure FalkorDB knowledge graph indexes
if self.kg_manager is not None:
try:
await self.kg_manager.ensure_indexes()
logger.info("Knowledge graph indexes verified")
except Exception:
logger.warning(
"Could not create knowledge graph indexes",
exc_info=True,
)
# Initialize tool embeddings in Redis (no-op if already present)
if self._classifier is not None:
import os
index_file = os.path.join(
os.path.dirname(__file__), "classifiers", "tool_index_data.json",
)
if os.path.exists(index_file):
try:
await initialize_tool_embeddings_from_file(
index_file_path=index_file,
redis_client=self.message_cache.redis_client,
api_key=self.cfg.openrouter_api_key,
)
# Pre-warm classifier's in-memory cache from Redis
await self._classifier._load_tool_embeddings()
except Exception:
logger.warning(
"Could not initialize tool embeddings",
exc_info=True,
)
else:
logger.info(
"No tool_index_data.json found; run "
"'python -m classifiers.build_tool_index' to generate it",
)
if self.embedding_queue is not None:
await self.embedding_queue.start()
# Load media cache index from disk (non-blocking)
await self.media_cache.ensure_loaded()
if not self.adapters:
logger.warning("No platform adapters configured")
return
results = await asyncio.gather(
*(a.start() for a in self.adapters),
return_exceptions=True,
)
for adapter, result in zip(self.adapters, results):
if isinstance(result, Exception):
logger.error(
"Failed to start %s: %s", adapter.name, result,
)
# Load user-donated embed keys from Redis into the live pool
if self.message_cache is not None:
from gemini_embed_pool import reload_pool
await reload_pool()
# Backfill recent channel history and enqueue missing embeddings
if self.message_cache is not None and self.adapters:
try:
await startup_channel_backfill(
redis=self.message_cache.redis_client,
message_cache=self.message_cache,
embedding_queue=self.embedding_queue,
adapters=self.adapters,
)
except Exception:
logger.warning(
"Startup channel backfill failed", exc_info=True,
)
await self.scheduler.start()
self.status_manager.start()
# Re-process any responses that were interrupted by the last shutdown
try:
await self.processor.recover_pending_responses(
self._enqueue_message, self.adapters,
)
except Exception:
logger.warning(
"Pending response recovery failed", exc_info=True,
)
[docs]
async def stop(self) -> None:
"""Stop all platform adapters and clean up."""
results = await asyncio.gather(
*(a.stop() for a in self.adapters),
return_exceptions=True,
)
for adapter, result in zip(self.adapters, results):
if isinstance(result, Exception):
logger.error(
"Failed to stop %s: %s", adapter.name, result,
)
await self.scheduler.stop()
self.status_manager.stop()
if self.embedding_queue is not None:
await self.embedding_queue.stop()
if self._classifier is not None:
await self._classifier.close()
if self.message_cache is not None:
await self.message_cache.close()
await self.openrouter.close()
logger.info("All platforms stopped")
[docs]
def get_adapter(self, platform_name: str) -> PlatformAdapter | None:
"""Return the adapter for *platform_name*, or ``None``."""
for adapter in self.adapters:
if adapter.name == platform_name:
return adapter
return None
# ------------------------------------------------------------------
# Webhook handling
# ------------------------------------------------------------------
[docs]
async def handle_webhook(
self,
platform_name: str,
channel_id: str,
event_data: dict[str, Any],
) -> None:
"""Inject an external webhook event into the message pipeline.
Looks up the named platform adapter, builds an
:class:`IncomingMessage` from *event_data*, and enqueues it for
processing through the normal message queue.
Raises :class:`ValueError` if the adapter is not found or not
running.
"""
adapter = self.get_adapter(platform_name)
if adapter is None:
raise ValueError(f"No adapter found for platform: {platform_name!r}")
if not adapter.is_running:
raise ValueError(f"Platform {platform_name!r} is not running")
webhook_json = json.dumps(event_data, indent=2)
is_scheduled = event_data.get("type") == "scheduled_prompt_execution"
if is_scheduled:
prompt_id = event_data.get("prompt_id")
prompt_id_info = f" (ID: {prompt_id})" if prompt_id else ""
cancellation_note = (
f"\n\nNOTE: If this scheduled prompt is causing problems or "
f"is no longer needed, you can cancel it using the "
f"cancel_scheduled_prompt tool with ID: {prompt_id}"
if prompt_id else ""
)
text = (
f"A scheduled prompt has been triggered for execution"
f"{prompt_id_info}. Here is the event data:\n\n"
f"```json\n{webhook_json}\n```\n\n"
f"IMPORTANT: This is a SCHEDULED PROMPT EXECUTION. Do NOT "
f"use the schedule_prompt tool in response to this - that "
f"would create infinite loops. Simply respond to the prompt "
f"content as if a user had asked it directly."
f"{cancellation_note}\n\n"
f"Analyze this scheduled prompt in the context of the recent "
f"conversation in this channel and respond appropriately."
)
else:
text = (
f"An external event has occurred via webhook. "
f"Here is the event data:\n\n"
f"```json\n{webhook_json}\n```\n\n"
f"Analyze this event in the context of the recent "
f"conversation in this channel. If appropriate, announce "
f"the event to the channel and take any necessary actions "
f"based on the event type and data provided."
)
msg = IncomingMessage(
platform=platform_name,
channel_id=channel_id,
user_id="webhook",
user_name="Webhook Event",
text=text,
is_addressed=True,
)
logger.info(
"Webhook event enqueued for %s:%s (%d chars, scheduled=%s)",
platform_name, channel_id, len(text), is_scheduled,
)
await self._enqueue_message(msg, adapter)
# ------------------------------------------------------------------
# Message queue integration
# ------------------------------------------------------------------
async def _enqueue_message(
self, msg: IncomingMessage, platform: PlatformAdapter,
) -> None:
"""Callback given to platform adapters instead of the processor.
Converts the incoming message into a :class:`QueuedMessage`,
remembers which adapter to use for replies, and starts the
per-channel processor loop if it isn't already running.
"""
# Intercept !stop before it enters the queue so it can cancel
# the currently-running response without waiting in line.
if msg.text and msg.text.strip().lower() == "!stop":
result = await self._cancel_response(
msg.platform,
msg.channel_id,
msg.user_id,
msg.extra.get("is_server_admin", False),
)
await platform.send(msg.channel_id, result)
return
channel_key = f"{msg.platform}:{msg.channel_id}"
self._channel_adapters[channel_key] = platform
queued = QueuedMessage(
platform=msg.platform,
channel_id=channel_key,
user_id=msg.user_id,
user_name=msg.user_name,
text=msg.text,
queued_at=msg.timestamp,
extra={"incoming_message": msg},
raw=msg,
)
await self.message_queue.enqueue(queued)
await self.message_queue.start_processing(
channel_key, self._process_queue_item,
)
async def _cancel_response(
self,
platform_name: str,
channel_id: str,
user_id: str,
is_server_admin: bool = False,
) -> str:
"""Cancel the in-progress response for a channel.
Returns a human-readable status string.
"""
channel_key = f"{platform_name}:{channel_id}"
if not self.message_queue.is_channel_processing(channel_key):
return "No response is currently in progress in this channel."
# -- Permission check ------------------------------------------
allowed = False
if str(user_id) in (self.cfg.admin_user_ids or []):
allowed = True
if is_server_admin:
allowed = True
if not allowed and self.message_cache is not None:
try:
pending_key = f"pending_response:{platform_name}:{channel_id}"
initiator = await self.message_cache.redis_client.hget(
pending_key, "user_id",
)
if initiator is not None:
initiator_str = (
initiator.decode()
if isinstance(initiator, bytes)
else str(initiator)
)
if initiator_str == str(user_id):
allowed = True
except Exception:
logger.debug(
"Failed to look up pending response initiator",
exc_info=True,
)
if not allowed:
return (
"You don't have permission to stop this response. "
"Only the person who triggered it, server admins, "
"or bot admins can stop it."
)
cancelled = self.message_queue.cancel_current(channel_key)
if cancelled:
return "Response stopped."
return "No response is currently in progress in this channel."
async def _process_queue_item(self, item: QueueItem) -> None:
"""Queue callback -- dispatches single messages or batches."""
if isinstance(item, MessageBatch):
first = item.messages[0]
channel_key = first.channel_id
platform = self._channel_adapters.get(channel_key)
if platform is None:
logger.error("No adapter for channel %s", channel_key)
return
original: IncomingMessage = first.extra["incoming_message"]
await self.processor.handle_batch(item, original, platform)
else:
original = item.extra["incoming_message"]
channel_key = item.channel_id
platform = self._channel_adapters.get(channel_key)
if platform is None:
logger.error("No adapter for channel %s", channel_key)
return
await self.processor.handle_message(original, platform)
# ------------------------------------------------------------------
# Main
# ------------------------------------------------------------------
[docs]
async def main() -> None:
"""Main.
"""
cfg = await asyncio.to_thread(Config.load)
# Validate required fields
if not cfg.api_key:
logger.error(
"api_key is required in config.yaml "
"or API_KEY / OPENROUTER_API_KEY env var",
)
sys.exit(1)
if not cfg.platforms:
logger.warning(
"No platforms configured. Add a 'platforms' section to "
"config.yaml or set legacy Matrix fields.",
)
# Load tools from the tools directory
tool_registry = ToolRegistry()
await asyncio.to_thread(load_tools, cfg.tools_dir, tool_registry)
# Initialize OAuth manager
from oauth_manager import init_oauth_manager
init_oauth_manager(
encryption_key=cfg.oauth_encryption_key,
base_url=cfg.oauth_base_url,
providers_config=cfg.oauth_providers,
)
# Create the bot runner and expose it to the web app
runner = BotRunner(cfg, tool_registry)
set_bot_runner(runner)
set_scheduled_prompt_runner(runner)
set_config(cfg)
# Auto-start platforms that have credentials / tokens
if runner.adapters:
logger.info("Auto-starting %d platform(s) …", len(runner.adapters))
try:
await runner.start()
except Exception:
logger.exception(
"Auto-start failed; use the web GUI to manage platforms",
)
# Start the web server
logger.info(
"Starting web GUI on http://%s:%d",
cfg.web_host, cfg.web_port,
)
server_config = uvicorn.Config(
web_app,
host=cfg.web_host,
port=cfg.web_port,
log_level="info",
loop="none",
)
server = uvicorn.Server(server_config)
try:
await server.serve()
except (KeyboardInterrupt, asyncio.CancelledError):
logger.info("Shutting down …")
finally:
await runner.stop()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass