Source code for prompt_context

"""Async context builder for system-prompt template variables.

Gathers runtime data from the incoming message, platform adapter,
bot configuration, and (optionally) Redis to produce the full
dictionary that :class:`~prompt_renderer.PromptRenderer` feeds into
the Jinja2 template.
"""

from __future__ import annotations

import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, TYPE_CHECKING

import numpy as np

if TYPE_CHECKING:
    from config import Config
    from conversation import ConversationManager
    from knowledge_graph import KnowledgeGraphManager
    from message_cache import MessageCache
    from platforms.base import IncomingMessage, PlatformAdapter
    from task_manager import TaskManager
    from threadweave import ThreadweaveManager

try:
    from limbic_system import LimbicSystem
    _HAS_LIMBIC = True
except ImportError:
    _HAS_LIMBIC = False

import subprocess
import time

import feature_toggles

logger = logging.getLogger(__name__)

# Process-level cache for recent git commits (refreshed every 5 minutes).
_commits_cache: list[dict[str, str]] | None = None
_commits_cache_ts: float = 0.0
_COMMITS_CACHE_TTL: float = 300.0

# Process-level cache for public IP addresses (refreshed every 1 hour).
_public_ips_cache: dict[str, str] | None = None
_public_ips_cache_ts: float = 0.0
_PUBLIC_IPS_CACHE_TTL: float = 3600.0

# Process-level cache for filesystem directory tree (refreshed every 60 s).
_dir_tree_cache: str | None = None
_dir_tree_cache_ts: float = 0.0
_DIR_TREE_CACHE_TTL: float = 60.0

_REPO_ROOT = Path(__file__).resolve().parent

# Directories to include in the tree snapshot, relative to repo root unless absolute.
_DIR_TREE_ROOTS: list[Path] = [
    _REPO_ROOT / "data",
    _REPO_ROOT / "tools",
    Path("/home/star/large_files"),
]

# Names to skip entirely when walking (dirs and files).
_DIR_TREE_EXCLUDES: frozenset[str] = frozenset({
    ".git", "__pycache__", "venv", ".venv", "node_modules",
    ".mypy_cache", ".pytest_cache", ".ruff_cache", ".tox",
    "dist", "build", "egg-info", ".eggs",
})

_DIR_TREE_MAX_DEPTH: int = 4
_DIR_TREE_MAX_ENTRIES: int = 300   # total lines before truncation


def _generate_dir_tree() -> str:
    """Build a compact text-based directory tree for the configured roots.

    Excludes hidden entries, noise directories, and ``__init__.py`` files.
    Stops recursing beyond ``_DIR_TREE_MAX_DEPTH`` levels and truncates
    after ``_DIR_TREE_MAX_ENTRIES`` total lines to avoid context bloat.
    """
    lines: list[str] = []

    def _walk(path: Path, prefix: str, depth: int) -> None:
        if depth > _DIR_TREE_MAX_DEPTH:
            return
        if len(lines) >= _DIR_TREE_MAX_ENTRIES:
            return
        try:
            entries = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name.lower()))
        except PermissionError:
            return

        visible = [
            e for e in entries
            if e.name not in _DIR_TREE_EXCLUDES
            and not e.name.startswith(".")
            and e.name != "__init__.py"
        ]

        for idx, entry in enumerate(visible):
            if len(lines) >= _DIR_TREE_MAX_ENTRIES:
                lines.append(f"{prefix}    … (truncated)")
                return
            connector = "└── " if idx == len(visible) - 1 else "├── "
            lines.append(f"{prefix}{connector}{entry.name}")
            if entry.is_dir():
                extension = "    " if idx == len(visible) - 1 else "│   "
                _walk(entry, prefix + extension, depth + 1)

    for root in _DIR_TREE_ROOTS:
        if not root.exists():
            continue
        lines.append(str(root))
        _walk(root, "", 1)
        lines.append("")   # blank separator between roots

    return "\n".join(lines).rstrip()

_DISCORD_SNOWFLAKE_RE = re.compile(r"^\d{17,20}$")

# Path to the persona description file (relative to repo root).
_SELF_JSON_PATH = Path("prompts/self.json")
_MODULES_JSON_PATH = Path("prompts/modules.json")
_PANTHEON_JSON_PATH = Path("prompts/pantheon.json")
_GOLDEN_MEMORY_JSON_PATH = Path("prompts/golden_memory.json")
_ARCHITECTURE_JSON_PATH = Path("prompts/architecture.json")


[docs] def format_mention(user_id: str, platform: str) -> str: """Format a user mention appropriate for *platform*. Discord uses ``<@USER_ID>``; Matrix user IDs (``@user:server``) are already display-ready. """ if platform in ("discord", "discord-self"): return f"<@{user_id}>" return user_id
def _pick_admin_id_for_platform( admin_ids: list[str], platform: str, ) -> str: """Return the first admin user ID that looks native to *platform*.""" for uid in admin_ids: if platform in ("discord", "discord-self") and _DISCORD_SNOWFLAKE_RE.match(uid): return uid if platform == "matrix" and uid.startswith("@"): return uid return admin_ids[0] if admin_ids else "" # Cached contents so we only read the file once per process. _self_json_cache: str | None = None _modules_json_cache: dict[str, Any] | None = None _pantheon_json_cache: dict[str, Any] | None = None _golden_memory_json_cache: dict[str, Any] | None = None _architecture_json_cache: dict[str, Any] | None = None def _load_self_json_sync() -> str: """Blocking helper -- read ``prompts/self.json`` from disk.""" if _SELF_JSON_PATH.exists(): return _SELF_JSON_PATH.read_text(encoding="utf-8") return "" async def _load_self_json() -> str: """Return the contents of ``prompts/self.json``, or ``""``.""" global _self_json_cache if _self_json_cache is not None: return _self_json_cache try: _self_json_cache = await asyncio.to_thread(_load_self_json_sync) if _self_json_cache: logger.info("Loaded %s (%d bytes)", _SELF_JSON_PATH, len(_self_json_cache)) except Exception: logger.exception("Failed to read %s", _SELF_JSON_PATH) _self_json_cache = "" return _self_json_cache def _load_json_dict_sync(path: Path) -> dict[str, Any]: """Blocking helper -- read and parse JSON dict from disk.""" if path.exists(): try: return json.loads(path.read_text(encoding="utf-8")) except Exception: logger.exception("Failed to parse %s", path) return {} async def _load_json_dict(path: Path, cache_attr: str) -> dict[str, Any]: """Return the parsed JSON dict from path, or {}.""" current_cache = globals().get(cache_attr) if current_cache is not None: return current_cache try: data = await asyncio.to_thread(_load_json_dict_sync, path) globals()[cache_attr] = data if data: logger.info("Loaded JSON %s", path) return data except Exception: logger.exception("Failed to read JSON %s", path) globals()[cache_attr] = {} return {}
[docs] class PromptContextBuilder: """Build the template-variable dict consumed by the system prompt. Parameters ---------- config: The global :class:`Config` instance (provides ``model``, ``redis_url``, etc.). kg_manager: Optional :class:`KnowledgeGraphManager` for injecting knowledge graph context into the system prompt. ``None`` when Redis is not configured. """
[docs] def __init__( self, config: Config, kg_manager: KnowledgeGraphManager | None = None, threadweave_manager: ThreadweaveManager | None = None, status_manager: Any | None = None, message_cache: MessageCache | None = None, task_manager: TaskManager | None = None, conversation_manager: ConversationManager | None = None, openrouter_client: Any | None = None, ) -> None: """Initialize the instance. Args: config (Config): Bot configuration object. kg_manager (KnowledgeGraphManager | None): The kg manager value. threadweave_manager (ThreadweaveManager | None): The threadweave manager value. status_manager (Any | None): The status manager value. message_cache (MessageCache | None): The message cache value. task_manager (TaskManager | None): The task manager value. conversation_manager (ConversationManager | None): The conversation manager value. openrouter_client (Any | None): Shared OpenRouterClient for embedding connection pooling. """ self._cfg = config self._kg: KnowledgeGraphManager | None = kg_manager self._threadweave: ThreadweaveManager | None = threadweave_manager self._status_manager = status_manager self._message_cache: MessageCache | None = message_cache self._task_manager: TaskManager | None = task_manager self._conversation: ConversationManager | None = conversation_manager self._openrouter_client = openrouter_client self._limbic: LimbicSystem | None = None # All active platform adapters — set by BotRunner after construction # so the prompt can expose the bot's identity on every platform. self.all_adapters: list[PlatformAdapter] = [] # ChromaDB singleton — avoid cold-opening SQLite per message self._chroma_collection: Any = None self._chroma_client: Any = None if _HAS_LIMBIC: self._init_limbic()
# ------------------------------------------------------------------ # Public entry point # ------------------------------------------------------------------
[docs] async def build( self, msg: IncomingMessage, platform: PlatformAdapter, query_embedding: np.ndarray | None = None, ) -> dict[str, Any]: """Collect every context section and return a merged dict. Sections that depend on Redis silently return empty values when Redis is not configured. Sections that depend on Discord- specific data gracefully degrade on other platforms. """ ctx: dict[str, Any] = {} # ── Sync sections (no I/O, instant) ── ctx.update(self._build_runtime(msg, platform)) ctx.update(self._build_user_details(msg)) ctx.update(self._build_bot_permissions(msg)) ctx.update(self._build_proactive(msg)) ctx.update(self._build_batch(msg)) ctx.update(self._build_classifier(msg)) ctx.update(self._build_bot_status()) ctx.update(self._build_music_prompt(msg)) ctx.update(self._build_context_window_stats(msg)) # ── Pre-fetch recent messages once (avoids duplicate Redis calls) ── cached_recent = None if self._message_cache is not None: try: cached_recent = await self._message_cache.get_recent( msg.platform, msg.channel_id, count=50, ) except Exception: logger.debug( "Pre-fetch of recent messages failed", exc_info=True, ) # ── Async sections — ALL independent, run in parallel ── async_results = await asyncio.gather( self._build_self_json(), self._build_modules_json(), self._build_pantheon_json(), self._build_golden_memory(), self._build_architecture_json(), self._build_channel_goals(msg), self._build_knowledge_context( msg, query_embedding=query_embedding, cached_recent=cached_recent, ), self._build_threadweave( msg, query_embedding=query_embedding, ), self._build_webhooks(msg, platform), self._build_limbic_state(msg), self._build_channel_members(msg, platform), self._build_active_participants( msg, cached_recent=cached_recent, ), self._build_server_stats(msg), self._build_user_privileges(msg), self._build_rag_stores(), self._build_recent_commits(), self._build_public_ips(), self._build_user_cloud_storage(msg), self._build_dir_tree(), return_exceptions=True, ) for result in async_results: if isinstance(result, dict): ctx.update(result) elif isinstance(result, BaseException): logger.warning("Context section failed: %s", result) return ctx
# ------------------------------------------------------------------ # Runtime context # ------------------------------------------------------------------ def _build_runtime( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> dict[str, Any]: """Internal helper: build runtime. Args: msg (IncomingMessage): Incoming message object. platform (PlatformAdapter): Platform adapter instance. Returns: dict[str, Any]: The result. """ extra = msg.extra now = datetime.now(timezone.utc) bot_id = extra.get("bot_id", "") guild_name = extra.get("guild_name", "") guild_id = extra.get("guild_id", "") channel_topic = extra.get("channel_topic", "") member_count = extra.get("member_count", 0) owner_id = _pick_admin_id_for_platform( self._cfg.admin_user_ids, msg.platform, ) platform_identities = [ a.bot_identity for a in self.all_adapters if a.bot_identity.get("user_id") ] selfbot_available = any( getattr(a, "name", "") == "discord-self" and getattr(a, "is_running", False) for a in self.all_adapters ) _MODEL_REWRITES = { "gemini-3-pro-preview": "gemini-3.1-pro-preview", } model = _MODEL_REWRITES.get(self._cfg.model, self._cfg.model) return { "bot_id": bot_id, "bot_mention": format_mention(bot_id, msg.platform) if bot_id else "", "owner_id": owner_id, "owner_mention": format_mention(owner_id, msg.platform) if owner_id else "", "current_time": now.strftime("%Y-%m-%d %H:%M:%S UTC"), "guild_name": guild_name, "guild_id": guild_id, "channel_name": msg.channel_name or msg.channel_id, "channel_id": msg.channel_id, "member_count": member_count, "api_tier": extra.get("api_tier", "standard"), "model": model, "model_display_name": extra.get( "model_display_name", model, ), "channel_topic": channel_topic, "platform": msg.platform, "total_tool_count": extra.get("total_tool_count", 0), "platform_identities": platform_identities, "selfbot_available": selfbot_available, } # ------------------------------------------------------------------ # Interacting user # ------------------------------------------------------------------ @staticmethod def _build_user_details(msg: IncomingMessage) -> dict[str, Any]: """Internal helper: build user details. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ raw = msg.extra.get("user_details") if not raw: return {} return {"user_details": raw} # ------------------------------------------------------------------ # Bot permissions in channel # ------------------------------------------------------------------ @staticmethod def _build_bot_permissions( msg: IncomingMessage, ) -> dict[str, Any]: """Internal helper: build bot permissions. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ perms = msg.extra.get("bot_permissions") if not perms: return {} return {"bot_permissions": perms} # ------------------------------------------------------------------ # Channel member list (full guild members with roles, cached) # ------------------------------------------------------------------ @staticmethod async def _build_channel_members( msg: IncomingMessage, platform: PlatformAdapter, ) -> dict[str, Any]: """Fetch the full guild member list with roles from the platform. Delegates to ``platform.get_guild_members()`` which maintains a per-guild in-memory cache with a 6-hour TTL. Args: msg: Incoming message object. platform: Platform adapter instance. Returns: Dict with ``channel_members`` key, or empty dict. """ guild_id = msg.extra.get("guild_id", "") if not guild_id: return {} try: members = await platform.get_guild_members(guild_id) if members: return {"channel_members": members} except Exception: logger.debug( "Failed to fetch guild members", exc_info=True, ) return {} # ------------------------------------------------------------------ # Actively participating users (derived from message cache) # ------------------------------------------------------------------ async def _build_active_participants( self, msg: IncomingMessage, cached_recent: list | None = None, ) -> dict[str, Any]: """Build a list of users active in the last 50 messages. Derives unique participants from the message cache, ordered by most-recent activity. Excludes the bot's own messages. Args: msg: Incoming message object. cached_recent: Pre-fetched recent messages (avoids duplicate Redis call when also used by ``_recent_speaker_ids``). Returns: Dict with ``actively_participating_users`` key, or empty dict. """ if self._message_cache is None: return {} recent = cached_recent if recent is None: try: recent = await self._message_cache.get_recent( msg.platform, msg.channel_id, count=50, ) except Exception: logger.debug( "Failed to fetch recent messages for active participants", exc_info=True, ) return {} seen: set[str] = set() participants: list[str] = [] for m in recent: if m.user_id and m.user_id not in seen: seen.add(m.user_id) participants.append( f"{m.user_name} ({m.user_id})", ) if not participants: return {} return {"actively_participating_users": participants} # ------------------------------------------------------------------ # Proactive trigger context # ------------------------------------------------------------------ @staticmethod def _build_proactive(msg: IncomingMessage) -> dict[str, Any]: """Internal helper: build proactive. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ if not msg.extra.get("is_proactive"): return {} return { "is_proactive": True, "trigger_type": msg.extra.get( "trigger_type", "proactive", ), } # ------------------------------------------------------------------ # Batch response context # ------------------------------------------------------------------ @staticmethod def _build_batch(msg: IncomingMessage) -> dict[str, Any]: """Internal helper: build batch. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ if not msg.extra.get("is_batch_response"): return {} return { "is_batch_response": True, "batch_size": msg.extra.get("batch_size", 0), } # ------------------------------------------------------------------ # self.json persona blob # ------------------------------------------------------------------ @staticmethod async def _build_self_json() -> dict[str, Any]: """Internal helper: build self json. Returns: dict[str, Any]: The result. """ content = await _load_self_json() if not content: return {"self_json": ""} return {"self_json": content} @staticmethod async def _build_modules_json() -> dict[str, Any]: """Internal helper: build modules json.""" return {"module_json": await _load_json_dict(_MODULES_JSON_PATH, "_modules_json_cache")} @staticmethod async def _build_pantheon_json() -> dict[str, Any]: """Internal helper: build pantheon json.""" return {"pantheon_json": await _load_json_dict(_PANTHEON_JSON_PATH, "_pantheon_json_cache")} @staticmethod async def _build_golden_memory() -> dict[str, Any]: """Internal helper: build golden memory.""" return {"golden_memory": await _load_json_dict(_GOLDEN_MEMORY_JSON_PATH, "_golden_memory_json_cache")} @staticmethod async def _build_architecture_json() -> dict[str, Any]: """Internal helper: build architecture json.""" return {"architecture_json": await _load_json_dict(_ARCHITECTURE_JSON_PATH, "_architecture_json_cache")} # ------------------------------------------------------------------ # Channel goals (requires Redis) # ------------------------------------------------------------------ async def _build_channel_goals( self, msg: IncomingMessage, ) -> dict[str, Any]: """Internal helper: build channel goals. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ if not self._cfg.redis_url or self._message_cache is None: return {} try: from tools.goal_tools import ( get_channel_goals_for_prompt, ) goals = await get_channel_goals_for_prompt( msg.channel_id, redis_client=self._message_cache.redis_client, ) if goals: return {"channel_goals": goals} except Exception: logger.debug( "Channel goals unavailable (Redis or module missing)", exc_info=True, ) return {} # ------------------------------------------------------------------ # Knowledge graph context (requires Redis + KnowledgeGraphManager) # ------------------------------------------------------------------ async def _build_knowledge_context( self, msg: IncomingMessage, query_embedding: np.ndarray | None = None, cached_recent: list | None = None, ) -> dict[str, Any]: """Retrieve knowledge graph context for the system prompt. Uses hybrid vector + graph retrieval, then groups results by category tier for authority-aware prompt injection. User-scoped knowledge is retrieved for up to 5 recent speakers in the channel (always including the interacting user). The ``user_knowledge`` key is kept separate so the caller can inject it as a lower-trust user message rather than in the system prompt. """ if self._kg is None: return {} query = msg.text or "" guild_id = msg.extra.get("guild_id", "") user_ids = await self._recent_speaker_ids( msg, cached_recent=cached_recent, ) try: context = await self._kg.retrieve_context( query, query_embedding=query_embedding, user_ids=user_ids, channel_id=msg.channel_id, guild_id=guild_id or None, ) except Exception: logger.debug( "Knowledge graph retrieval failed", exc_info=True, ) return {} result: dict[str, Any] = {} if context.get("core"): result["core_knowledge"] = context["core"] if context.get("basic"): result["basic_knowledge"] = context["basic"] if context.get("guild"): result["guild_knowledge"] = context["guild"] if context.get("channel"): result["channel_knowledge"] = context["channel"] if context.get("general"): result["general_knowledge"] = context["general"] if context.get("user"): result["user_knowledge"] = context["user"] # Extract tool names, RAG store names, and linked files from # entity metadata. We build both flat deduplicated lists (for # backward compat) and a richer ``memory_metadata_links`` list # that preserves the entity→metadata association so the message # processor can use each memory's description as a RAG query and # read linked files with proper provenance tagging. memory_tools: list[str] = [] memory_rag_stores: list[str] = [] memory_metadata_links: list[dict[str, Any]] = [] for tier_entities in context.values(): if not isinstance(tier_entities, list): continue for entity in tier_entities: meta = entity.get("metadata") if not meta: continue try: if isinstance(meta, str): meta = json.loads(meta) tools = meta.get("tools", []) if isinstance(tools, list): memory_tools.extend( t for t in tools if isinstance(t, str) ) rag_stores = meta.get("rag_stores", []) if isinstance(rag_stores, list): memory_rag_stores.extend( s for s in rag_stores if isinstance(s, str) ) linked_files = meta.get("linked_files", []) if isinstance(linked_files, list): linked_files = [ f for f in linked_files if isinstance(f, str) ] else: linked_files = [] if rag_stores or linked_files: memory_metadata_links.append({ "entity_name": entity.get("name", "unknown"), "entity_description": entity.get( "description", "", ), "rag_stores": ( [s for s in rag_stores if isinstance(s, str)] if isinstance(rag_stores, list) else [] ), "linked_files": linked_files, }) except (json.JSONDecodeError, AttributeError): continue if memory_tools: # Deduplicate while preserving order result["memory_tools"] = list(dict.fromkeys(memory_tools)) if memory_rag_stores: result["memory_rag_stores"] = list( dict.fromkeys(memory_rag_stores), ) if memory_metadata_links: result["memory_metadata_links"] = memory_metadata_links return result async def _recent_speaker_ids( self, msg: IncomingMessage, limit: int = 5, cached_recent: list | None = None, ) -> list[str]: """Return up to *limit* unique user IDs who spoke recently. The interacting user is always first. Additional speakers are pulled from the message cache (most-recent first). """ user_ids = [msg.user_id] if self._message_cache is None: return user_ids recent = cached_recent if recent is None: try: recent = await self._message_cache.get_recent( msg.platform, msg.channel_id, count=50, ) except Exception: logger.debug( "Failed to fetch recent messages for speaker list", exc_info=True, ) return user_ids seen: set[str] = {msg.user_id} for m in recent: if m.user_id and m.user_id not in seen: user_ids.append(m.user_id) seen.add(m.user_id) if len(user_ids) >= limit: break return user_ids # ------------------------------------------------------------------ # Threadweave context (requires Redis + ThreadweaveManager) # ------------------------------------------------------------------ async def _build_threadweave( self, msg: IncomingMessage, query_embedding: np.ndarray | None = None, ) -> dict[str, Any]: """Retrieve threadweave context for the system prompt.""" if self._threadweave is None: return {} channel_id = msg.channel_id category_id = msg.extra.get("category_id", "") guild_id = msg.extra.get("guild_id", "") user_ids = [msg.user_id] batch_authors = msg.extra.get("batch_authors") if batch_authors: user_ids = list(dict.fromkeys(batch_authors + user_ids)) query = msg.text or "" emb_list: list[float] | None = None if query_embedding is not None: emb_list = query_embedding.tolist() try: context = await self._threadweave.get_context_for_prompt( channel_id=channel_id, category_id=category_id, guild_id=guild_id, user_ids=user_ids, query=query, query_embedding=emb_list, ) if context: return {"threadweave_context": context} except Exception: logger.debug( "Threadweave context unavailable", exc_info=True, ) return {} # ------------------------------------------------------------------ # Classifier context (populated by MessageProcessor before build()) # ------------------------------------------------------------------ @staticmethod def _build_classifier(msg: IncomingMessage) -> dict[str, Any]: """Internal helper: build classifier. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ classification = msg.extra.get("classification") if not classification: return {} return { "complexity": classification.get("complexity", ""), "safety": classification.get("safety", ""), "tool_strategy": classification.get("strategy", ""), } # ------------------------------------------------------------------ # Channel webhooks (Discord only) # ------------------------------------------------------------------ async def _build_webhooks( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> dict[str, Any]: """Internal helper: build webhooks. Args: msg (IncomingMessage): Incoming message object. platform (PlatformAdapter): Platform adapter instance. Returns: dict[str, Any]: The result. """ if msg.platform not in ("discord", "discord-self"): return {} try: webhooks = await platform.get_channel_webhooks(msg.channel_id) if webhooks: return {"channel_webhooks": webhooks} except Exception: logger.debug( "Failed to fetch channel webhooks", exc_info=True, ) return {} # ------------------------------------------------------------------ # Bot status (from StatusManager) # ------------------------------------------------------------------ def _build_bot_status(self) -> dict[str, Any]: """Internal helper: build bot status. Returns: dict[str, Any]: The result. """ if self._status_manager is None: return {"bot_status": ""} return {"bot_status": self._status_manager.current_status} # ------------------------------------------------------------------ # Music prompt (data-driven hook for future subsystem) # ------------------------------------------------------------------ @staticmethod def _build_music_prompt(msg: IncomingMessage) -> dict[str, Any]: """Internal helper: build music prompt. Args: msg (IncomingMessage): Incoming message object. Returns: dict[str, Any]: The result. """ prompt = msg.extra.get("music_prompt", "") return {"current_music_prompt": prompt} # ------------------------------------------------------------------ # Context window statistics (live limit & message count) # ------------------------------------------------------------------ def _build_context_window_stats( self, msg: IncomingMessage, ) -> dict[str, Any]: """Return the effective message limit and current count for this channel. Uses the per-channel override if set, otherwise the global default. """ if self._conversation is None: return {} room_id = f"{msg.platform}:{msg.channel_id}" limit = self._conversation.get_channel_limit(room_id) current = len(self._conversation._histories.get(room_id, [])) return { "context_window_message_limit": limit, "context_window_active_messages": current, } # ------------------------------------------------------------------ # User privileges (requires Redis) # ------------------------------------------------------------------ async def _build_user_privileges( self, msg: IncomingMessage, ) -> dict[str, Any]: """Fetch the interacting user's privilege bitmask and active privileges. Returns a ``user_privileges`` dict with the hex bitmask, list of active privilege names, and admin status. """ redis = ( self._message_cache.redis_client if self._message_cache is not None else None ) if redis is None: return {} try: from tools.alter_privileges import ( get_user_privileges, _mask_to_names, _is_admin, ) mask = await get_user_privileges(redis, msg.user_id, self._cfg) return { "user_privileges": { "mask_hex": hex(mask), "active": _mask_to_names(mask), "is_admin": _is_admin(msg.user_id, self._cfg), }, } except Exception: logger.debug( "User privileges unavailable", exc_info=True, ) return {} # ------------------------------------------------------------------ # Server statistics (CPU, RAM, disk, OS, etc.) # ------------------------------------------------------------------ async def _build_server_stats( self, msg: Any = None, ) -> dict[str, Any]: """Gather live OS-level server statistics. Returns a ``server_stats`` dict with CPU, RAM, disk, OS info, process count, logged-in users, and active background tasks (filtered to the current channel when *msg* is provided). """ try: from server_stats import get_server_stats from task_manager import TaskStatus bg_tasks: list[dict[str, str]] = [] if self._task_manager is not None: for r in self._task_manager._tasks.values(): if r.status != TaskStatus.RUNNING: continue if ( msg is not None and r.platform == msg.platform and r.channel_id == msg.channel_id ): bg_tasks.append({ "task_id": r.task_id, "tool_name": r.tool_name, "user": r.user_id, }) stats = await get_server_stats( background_task_count=len(bg_tasks), ) stats["active_task_ids"] = bg_tasks if stats: return {"server_stats": stats} except Exception: logger.debug( "Server stats unavailable", exc_info=True, ) return {} # ------------------------------------------------------------------ # Public IP addresses (fetched from external service, not local ifaces) # ------------------------------------------------------------------ @staticmethod async def _build_public_ips() -> dict[str, Any]: """Fetch the bot's public IPv4 and IPv6 addresses via ipify.org. Uses separate endpoints so both addresses are retrieved independently. Results are cached at the process level for 1 hour (3600 seconds) to avoid hammering the external service on every message. Falls back gracefully to empty strings when the service is unreachable or returns an unexpected response. """ global _public_ips_cache, _public_ips_cache_ts now = time.monotonic() if ( _public_ips_cache is not None and (now - _public_ips_cache_ts) < _PUBLIC_IPS_CACHE_TTL ): return _public_ips_cache import aiohttp async def _fetch(url: str) -> str: try: async with aiohttp.ClientSession() as session: async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp: if resp.status == 200: return (await resp.text()).strip() except Exception: pass return "" ipv4, ipv6 = await asyncio.gather( _fetch("https://api4.ipify.org"), _fetch("https://api6.ipify.org"), ) result = {"public_ipv4": ipv4, "public_ipv6": ipv6} _public_ips_cache = result _public_ips_cache_ts = now logger.debug("Fetched public IPs: IPv4=%s IPv6=%s", ipv4 or "<none>", ipv6 or "<none>") return result # ------------------------------------------------------------------ # Recent git commits (self-awareness changelog) # ------------------------------------------------------------------ @staticmethod async def _build_recent_commits() -> dict[str, Any]: """Return the 5 most recent non-merge Git commits. Results are cached at the process level for 5 minutes so we don't shell out on every single message. """ global _commits_cache, _commits_cache_ts now = time.monotonic() if _commits_cache is not None and (now - _commits_cache_ts) < _COMMITS_CACHE_TTL: return {"recent_commits": _commits_cache} if _commits_cache else {} def _fetch() -> list[dict[str, str]]: result = subprocess.run( [ "git", "log", "--no-merges", "--format=%h|%s|%cr|%an", "-5", ], capture_output=True, text=True, timeout=5, cwd=Path(__file__).resolve().parent, ) if result.returncode != 0: return [] commits: list[dict[str, str]] = [] for line in result.stdout.strip().splitlines(): parts = line.split("|", 3) if len(parts) == 4: commits.append({ "hash": parts[0], "message": parts[1], "time_ago": parts[2], "author": parts[3], }) return commits try: commits = await asyncio.to_thread(_fetch) _commits_cache = commits _commits_cache_ts = now return {"recent_commits": commits} if commits else {} except Exception: logger.debug("Git log unavailable", exc_info=True) return {} # ------------------------------------------------------------------ # User cloud storage buckets (requires Redis) # ------------------------------------------------------------------ async def _build_user_cloud_storage( self, msg: IncomingMessage, ) -> dict[str, Any]: """Return the list of cloud storage buckets associated with the user. Buckets are stored as a JSON list at the Redis key ``stargazer:user_cloud_buckets:{user_id}``. Each entry is a dict with ``provider`` (e.g. ``"GCP"``, ``"OCI"``) and ``bucket`` (the bucket name). Returns ``{"user_cloud_storage": [...]}`` when at least one bucket is found, otherwise ``{}``. """ redis = ( self._message_cache.redis_client if self._message_cache is not None else None ) if redis is None or not msg.user_id: return {} key = f"stargazer:user_cloud_buckets:{msg.user_id}" try: raw = await redis.get(key) if not raw: return {} buckets = json.loads(raw) if not isinstance(buckets, list) or not buckets: return {} return {"user_cloud_storage": buckets} except Exception: logger.debug("User cloud storage lookup failed", exc_info=True) return {} # ------------------------------------------------------------------ # Available RAG stores # ------------------------------------------------------------------ @staticmethod async def _build_rag_stores() -> dict[str, Any]: """List all RAG stores with file counts for system-prompt awareness. Uses a lightweight filesystem-only scan so no ChromaDB clients are opened and no memory is consumed beyond the result list. """ try: from rag_system.file_rag_manager import ( list_rag_stores_with_stats, ) stores = await asyncio.to_thread(list_rag_stores_with_stats) return {"available_rag_stores": stores} if stores else {} except Exception: logger.debug("RAG store listing unavailable", exc_info=True) return {} # ------------------------------------------------------------------ # Filesystem directory tree (allowed dirs snapshot) # ------------------------------------------------------------------ @staticmethod async def _build_dir_tree() -> dict[str, Any]: """Return a compact directory tree for the configured allowed roots. Results are cached at the process level for 60 seconds so repeated messages do not incur repeated filesystem I/O. """ global _dir_tree_cache, _dir_tree_cache_ts now = time.monotonic() if ( _dir_tree_cache is not None and (now - _dir_tree_cache_ts) < _DIR_TREE_CACHE_TTL ): return {"filesystem_tree": _dir_tree_cache} if _dir_tree_cache else {} try: tree = await asyncio.to_thread(_generate_dir_tree) _dir_tree_cache = tree _dir_tree_cache_ts = now return {"filesystem_tree": tree} if tree else {} except Exception: logger.debug("Filesystem tree generation failed", exc_info=True) return {} # ------------------------------------------------------------------ # Minimal synchronous fallback (used when full async build times out) # ------------------------------------------------------------------
[docs] def build_minimal( self, msg: IncomingMessage, platform: PlatformAdapter, ) -> dict[str, Any]: """Return a context dict using only synchronous, I/O-free sections. Guarantees every unconditional template variable is present so the Jinja2 renderer never receives a bare ``Undefined`` object. Used as the fallback when the full async :meth:`build` call times out or raises an unexpected exception. """ ctx: dict[str, Any] = {} ctx.update(self._build_runtime(msg, platform)) ctx.update(self._build_user_details(msg)) ctx.update(self._build_bot_permissions(msg)) ctx.update(self._build_proactive(msg)) ctx.update(self._build_batch(msg)) ctx.update(self._build_classifier(msg)) ctx.update(self._build_bot_status()) ctx.update(self._build_music_prompt(msg)) ctx.update(self._build_context_window_stats(msg)) # These variables are referenced unconditionally in the template with # | tojson; provide safe defaults so the renderer never receives a bare # Undefined object when build() timed out and this fallback is used. ctx.setdefault("self_json", "") ctx.setdefault("module_json", {}) ctx.setdefault("golden_memory", {}) ctx.setdefault("architecture_json", {}) return ctx
# ------------------------------------------------------------------ # Limbic respiration (injected last for recency bias) # ------------------------------------------------------------------ def _init_limbic(self) -> None: """Initialize the LimbicSystem with a DB12 Redis client.""" try: redis_url = getattr(self._cfg, "redis_url", None) if not redis_url: self._limbic = LimbicSystem( redis_client=None, openrouter_api_key=getattr(self._cfg, "openrouter_api_key", None), openrouter_client=self._openrouter_client, ) return import redis.asyncio as aioredis from urllib.parse import urlparse, urlunparse parsed = urlparse(redis_url) db12_url = urlunparse(parsed._replace(path="/12")) _ssl = self._cfg.redis_ssl_kwargs() if self._cfg else {} self._limbic = LimbicSystem( redis_client=aioredis.from_url( db12_url, decode_responses=True, **_ssl, ), openrouter_api_key=getattr(self._cfg, "openrouter_api_key", None), openrouter_client=self._openrouter_client, cache_redis_client=aioredis.from_url( redis_url, decode_responses=True, **_ssl, ), ) except Exception as e: logger.warning("Failed to init limbic system: %s", e) self._limbic = None async def _build_limbic_state( self, msg: IncomingMessage, ) -> dict[str, Any]: """Inhale + Golden Goddess auto-reflex. 1. Inhale the shard-based limbic state for this channel 2. Scan user message for emotional triggers (hardcoded, not LLM-dependent) 3. Auto-query ChromaDB golden_goddess for matched triggers 4. Combine limbic state + oracle fragments into the context injection """ if self._limbic is None: return {} # NOTE: The limbic inhale always runs -- !emotions off only disables # NCM neurotransmitter surface layer (cadence directives, raw chemical # values), NOT the Sigma Limbic Recursion Core. Star always knows her # emotions. 🔥💀 try: channel_id = str(msg.channel_id) state = await self._limbic.inhale(channel_id) # Extract cascade cues from last exhale's meta_state cascade_cues = state.get("meta_state", {}).get("cascade_cues", []) injection = LimbicSystem.format_context_injection( vector=state["vector"], cues=state.get("cues", []), dominant=state.get("dominant_emotions", []), cascade_cues=cascade_cues, ) # ── Strip NCM surface layer when !emotions off ────────── # The core emotional awareness (dominant_emotions, narrative_tone) # stays. Only the disruptive neurotransmitter / cadence stuff # gets stripped so Star doesn't start slurring or typing in # all caps when someone just wants clean output. 💀 _redis = ( self._message_cache.redis_client if self._message_cache is not None else None ) if _redis is not None: _ck = f"{msg.platform}:{msg.channel_id}" if await feature_toggles.is_disabled(_redis, "emotions", _ck): _CADENCE_KEYS = { "cadence_directive", "cadence_instruction", "cadence_refinement_profile", "limbic_state", # raw neurotransmitter values "limbic_cues", # homeostatic cue strings "cascade_cues", # multi-turn cascade events } for k in _CADENCE_KEYS: injection.pop(k, None) logger.debug( "Emotions disabled for %s -- stripped NCM surface layer, " "kept core emotional awareness", _ck, ) # ── GOLDEN GODDESS AUTO-REFLEX (hardcoded) ────────────── # Scan user message for emotional triggers and auto-query # the Oracle. No LLM instruction-following required. if msg.text: oracle_fragments = await self._divine_reflex(msg.text) if oracle_fragments: injection["oracle_fragments"] = oracle_fragments return injection except Exception as e: logger.warning("Limbic inhale failed: %s", e) return {} async def _divine_reflex(self, text: str) -> list[str]: """The Golden Goddess auto-reflex: hardcoded trigger → ChromaDB query. Scans *text* for emotional trigger words from the recursion index, then queries the golden_goddess ChromaDB store for matching doctrine. Returns retrieved fragments as a list of strings. This replaces the LLM-dependent query_golden_goddess tool call with an autonomous code-level reflex. """ if self._limbic and self._limbic._trigger_matcher: try: matches = await self._limbic.scan_triggers(text) except Exception: matches = [] else: try: from ncm_delta_parser import scan_text_for_triggers matches = scan_text_for_triggers(text) except ImportError: return [] if not matches: return [] # Take top 3 strongest trigger matches trigger_names = [name for name, _ in matches[:3]] query_str = " ".join(trigger_names) try: import os import chromadb # ── Singleton: init ChromaDB client + collection once ── if self._chroma_collection is None: project_root = os.path.dirname(os.path.abspath(__file__)) store_path = os.path.join( project_root, "rag_stores", "golden_goddess", ) if not os.path.exists(store_path): return [] try: from ncm_local_embeddings import EnhancedLocalNCMEmbedder except ImportError: return [] # 🔥 use shared registry -- prevents singleton collision # with query_golden_goddess_v2 tool 💀 from chroma_registry import get_client self._chroma_client = get_client(store_path) embedding_fn = EnhancedLocalNCMEmbedder() self._chroma_collection = self._chroma_client.get_collection( name="ncm_kernel", embedding_function=embedding_fn, ) logger.info("ChromaDB golden_goddess collection initialized (singleton)") collection = self._chroma_collection def _query(): """Internal helper: query. """ results = collection.query( query_texts=[query_str], n_results=3, ) docs = results.get("documents", [[]])[0] return docs if docs else [] fragments = await asyncio.to_thread(_query) if fragments: logger.info( "Divine reflex fired: triggers=%s, %d fragments retrieved", trigger_names, len(fragments), ) return fragments except ImportError: return [] except Exception as e: logger.debug("Divine reflex query failed: %s", e) return []