"""Async context builder for system-prompt template variables.
Gathers runtime data from the incoming message, platform adapter,
bot configuration, and (optionally) Redis to produce the full
dictionary that :class:`~prompt_renderer.PromptRenderer` feeds into
the Jinja2 template.
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, TYPE_CHECKING
import numpy as np
if TYPE_CHECKING:
from config import Config
from conversation import ConversationManager
from knowledge_graph import KnowledgeGraphManager
from message_cache import MessageCache
from platforms.base import IncomingMessage, PlatformAdapter
from task_manager import TaskManager
from threadweave import ThreadweaveManager
try:
from limbic_system import LimbicSystem
_HAS_LIMBIC = True
except ImportError:
_HAS_LIMBIC = False
import subprocess
import time
import feature_toggles
logger = logging.getLogger(__name__)
# Process-level cache for recent git commits (refreshed every 5 minutes).
_commits_cache: list[dict[str, str]] | None = None
_commits_cache_ts: float = 0.0
_COMMITS_CACHE_TTL: float = 300.0
# Process-level cache for public IP addresses (refreshed every 1 hour).
_public_ips_cache: dict[str, str] | None = None
_public_ips_cache_ts: float = 0.0
_PUBLIC_IPS_CACHE_TTL: float = 3600.0
# Process-level cache for filesystem directory tree (refreshed every 60 s).
_dir_tree_cache: str | None = None
_dir_tree_cache_ts: float = 0.0
_DIR_TREE_CACHE_TTL: float = 60.0
_REPO_ROOT = Path(__file__).resolve().parent
# Directories to include in the tree snapshot, relative to repo root unless absolute.
_DIR_TREE_ROOTS: list[Path] = [
_REPO_ROOT / "data",
_REPO_ROOT / "tools",
Path("/home/star/large_files"),
]
# Names to skip entirely when walking (dirs and files).
_DIR_TREE_EXCLUDES: frozenset[str] = frozenset({
".git", "__pycache__", "venv", ".venv", "node_modules",
".mypy_cache", ".pytest_cache", ".ruff_cache", ".tox",
"dist", "build", "egg-info", ".eggs",
})
_DIR_TREE_MAX_DEPTH: int = 4
_DIR_TREE_MAX_ENTRIES: int = 300 # total lines before truncation
def _generate_dir_tree() -> str:
"""Build a compact text-based directory tree for the configured roots.
Excludes hidden entries, noise directories, and ``__init__.py`` files.
Stops recursing beyond ``_DIR_TREE_MAX_DEPTH`` levels and truncates
after ``_DIR_TREE_MAX_ENTRIES`` total lines to avoid context bloat.
"""
lines: list[str] = []
def _walk(path: Path, prefix: str, depth: int) -> None:
if depth > _DIR_TREE_MAX_DEPTH:
return
if len(lines) >= _DIR_TREE_MAX_ENTRIES:
return
try:
entries = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name.lower()))
except PermissionError:
return
visible = [
e for e in entries
if e.name not in _DIR_TREE_EXCLUDES
and not e.name.startswith(".")
and e.name != "__init__.py"
]
for idx, entry in enumerate(visible):
if len(lines) >= _DIR_TREE_MAX_ENTRIES:
lines.append(f"{prefix} … (truncated)")
return
connector = "└── " if idx == len(visible) - 1 else "├── "
lines.append(f"{prefix}{connector}{entry.name}")
if entry.is_dir():
extension = " " if idx == len(visible) - 1 else "│ "
_walk(entry, prefix + extension, depth + 1)
for root in _DIR_TREE_ROOTS:
if not root.exists():
continue
lines.append(str(root))
_walk(root, "", 1)
lines.append("") # blank separator between roots
return "\n".join(lines).rstrip()
_DISCORD_SNOWFLAKE_RE = re.compile(r"^\d{17,20}$")
# Path to the persona description file (relative to repo root).
_SELF_JSON_PATH = Path("prompts/self.json")
_MODULES_JSON_PATH = Path("prompts/modules.json")
_PANTHEON_JSON_PATH = Path("prompts/pantheon.json")
_GOLDEN_MEMORY_JSON_PATH = Path("prompts/golden_memory.json")
_ARCHITECTURE_JSON_PATH = Path("prompts/architecture.json")
def _pick_admin_id_for_platform(
admin_ids: list[str], platform: str,
) -> str:
"""Return the first admin user ID that looks native to *platform*."""
for uid in admin_ids:
if platform in ("discord", "discord-self") and _DISCORD_SNOWFLAKE_RE.match(uid):
return uid
if platform == "matrix" and uid.startswith("@"):
return uid
return admin_ids[0] if admin_ids else ""
# Cached contents so we only read the file once per process.
_self_json_cache: str | None = None
_modules_json_cache: dict[str, Any] | None = None
_pantheon_json_cache: dict[str, Any] | None = None
_golden_memory_json_cache: dict[str, Any] | None = None
_architecture_json_cache: dict[str, Any] | None = None
def _load_self_json_sync() -> str:
"""Blocking helper -- read ``prompts/self.json`` from disk."""
if _SELF_JSON_PATH.exists():
return _SELF_JSON_PATH.read_text(encoding="utf-8")
return ""
async def _load_self_json() -> str:
"""Return the contents of ``prompts/self.json``, or ``""``."""
global _self_json_cache
if _self_json_cache is not None:
return _self_json_cache
try:
_self_json_cache = await asyncio.to_thread(_load_self_json_sync)
if _self_json_cache:
logger.info("Loaded %s (%d bytes)", _SELF_JSON_PATH,
len(_self_json_cache))
except Exception:
logger.exception("Failed to read %s", _SELF_JSON_PATH)
_self_json_cache = ""
return _self_json_cache
def _load_json_dict_sync(path: Path) -> dict[str, Any]:
"""Blocking helper -- read and parse JSON dict from disk."""
if path.exists():
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
logger.exception("Failed to parse %s", path)
return {}
async def _load_json_dict(path: Path, cache_attr: str) -> dict[str, Any]:
"""Return the parsed JSON dict from path, or {}."""
current_cache = globals().get(cache_attr)
if current_cache is not None:
return current_cache
try:
data = await asyncio.to_thread(_load_json_dict_sync, path)
globals()[cache_attr] = data
if data:
logger.info("Loaded JSON %s", path)
return data
except Exception:
logger.exception("Failed to read JSON %s", path)
globals()[cache_attr] = {}
return {}
[docs]
class PromptContextBuilder:
"""Build the template-variable dict consumed by the system prompt.
Parameters
----------
config:
The global :class:`Config` instance (provides ``model``,
``redis_url``, etc.).
kg_manager:
Optional :class:`KnowledgeGraphManager` for injecting knowledge
graph context into the system prompt. ``None`` when Redis is
not configured.
"""
[docs]
def __init__(
self,
config: Config,
kg_manager: KnowledgeGraphManager | None = None,
threadweave_manager: ThreadweaveManager | None = None,
status_manager: Any | None = None,
message_cache: MessageCache | None = None,
task_manager: TaskManager | None = None,
conversation_manager: ConversationManager | None = None,
openrouter_client: Any | None = None,
) -> None:
"""Initialize the instance.
Args:
config (Config): Bot configuration object.
kg_manager (KnowledgeGraphManager | None): The kg manager value.
threadweave_manager (ThreadweaveManager | None): The threadweave manager value.
status_manager (Any | None): The status manager value.
message_cache (MessageCache | None): The message cache value.
task_manager (TaskManager | None): The task manager value.
conversation_manager (ConversationManager | None): The conversation manager value.
openrouter_client (Any | None): Shared OpenRouterClient for
embedding connection pooling.
"""
self._cfg = config
self._kg: KnowledgeGraphManager | None = kg_manager
self._threadweave: ThreadweaveManager | None = threadweave_manager
self._status_manager = status_manager
self._message_cache: MessageCache | None = message_cache
self._task_manager: TaskManager | None = task_manager
self._conversation: ConversationManager | None = conversation_manager
self._openrouter_client = openrouter_client
self._limbic: LimbicSystem | None = None
# All active platform adapters — set by BotRunner after construction
# so the prompt can expose the bot's identity on every platform.
self.all_adapters: list[PlatformAdapter] = []
# ChromaDB singleton — avoid cold-opening SQLite per message
self._chroma_collection: Any = None
self._chroma_client: Any = None
if _HAS_LIMBIC:
self._init_limbic()
# ------------------------------------------------------------------
# Public entry point
# ------------------------------------------------------------------
[docs]
async def build(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
query_embedding: np.ndarray | None = None,
) -> dict[str, Any]:
"""Collect every context section and return a merged dict.
Sections that depend on Redis silently return empty values when
Redis is not configured. Sections that depend on Discord-
specific data gracefully degrade on other platforms.
"""
ctx: dict[str, Any] = {}
# ── Sync sections (no I/O, instant) ──
ctx.update(self._build_runtime(msg, platform))
ctx.update(self._build_user_details(msg))
ctx.update(self._build_bot_permissions(msg))
ctx.update(self._build_proactive(msg))
ctx.update(self._build_batch(msg))
ctx.update(self._build_classifier(msg))
ctx.update(self._build_bot_status())
ctx.update(self._build_music_prompt(msg))
ctx.update(self._build_context_window_stats(msg))
# ── Pre-fetch recent messages once (avoids duplicate Redis calls) ──
cached_recent = None
if self._message_cache is not None:
try:
cached_recent = await self._message_cache.get_recent(
msg.platform, msg.channel_id, count=50,
)
except Exception:
logger.debug(
"Pre-fetch of recent messages failed",
exc_info=True,
)
# ── Async sections — ALL independent, run in parallel ──
async_results = await asyncio.gather(
self._build_self_json(),
self._build_modules_json(),
self._build_pantheon_json(),
self._build_golden_memory(),
self._build_architecture_json(),
self._build_channel_goals(msg),
self._build_knowledge_context(
msg, query_embedding=query_embedding,
cached_recent=cached_recent,
),
self._build_threadweave(
msg, query_embedding=query_embedding,
),
self._build_webhooks(msg, platform),
self._build_limbic_state(msg),
self._build_channel_members(msg, platform),
self._build_active_participants(
msg, cached_recent=cached_recent,
),
self._build_server_stats(msg),
self._build_user_privileges(msg),
self._build_rag_stores(),
self._build_recent_commits(),
self._build_public_ips(),
self._build_user_cloud_storage(msg),
self._build_dir_tree(),
return_exceptions=True,
)
for result in async_results:
if isinstance(result, dict):
ctx.update(result)
elif isinstance(result, BaseException):
logger.warning("Context section failed: %s", result)
return ctx
# ------------------------------------------------------------------
# Runtime context
# ------------------------------------------------------------------
def _build_runtime(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> dict[str, Any]:
"""Internal helper: build runtime.
Args:
msg (IncomingMessage): Incoming message object.
platform (PlatformAdapter): Platform adapter instance.
Returns:
dict[str, Any]: The result.
"""
extra = msg.extra
now = datetime.now(timezone.utc)
bot_id = extra.get("bot_id", "")
guild_name = extra.get("guild_name", "")
guild_id = extra.get("guild_id", "")
channel_topic = extra.get("channel_topic", "")
member_count = extra.get("member_count", 0)
owner_id = _pick_admin_id_for_platform(
self._cfg.admin_user_ids, msg.platform,
)
platform_identities = [
a.bot_identity for a in self.all_adapters if a.bot_identity.get("user_id")
]
selfbot_available = any(
getattr(a, "name", "") == "discord-self" and getattr(a, "is_running", False)
for a in self.all_adapters
)
_MODEL_REWRITES = {
"gemini-3-pro-preview": "gemini-3.1-pro-preview",
}
model = _MODEL_REWRITES.get(self._cfg.model, self._cfg.model)
return {
"bot_id": bot_id,
"bot_mention": format_mention(bot_id, msg.platform) if bot_id else "",
"owner_id": owner_id,
"owner_mention": format_mention(owner_id, msg.platform) if owner_id else "",
"current_time": now.strftime("%Y-%m-%d %H:%M:%S UTC"),
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": msg.channel_name or msg.channel_id,
"channel_id": msg.channel_id,
"member_count": member_count,
"api_tier": extra.get("api_tier", "standard"),
"model": model,
"model_display_name": extra.get(
"model_display_name", model,
),
"channel_topic": channel_topic,
"platform": msg.platform,
"total_tool_count": extra.get("total_tool_count", 0),
"platform_identities": platform_identities,
"selfbot_available": selfbot_available,
}
# ------------------------------------------------------------------
# Interacting user
# ------------------------------------------------------------------
@staticmethod
def _build_user_details(msg: IncomingMessage) -> dict[str, Any]:
"""Internal helper: build user details.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
raw = msg.extra.get("user_details")
if not raw:
return {}
return {"user_details": raw}
# ------------------------------------------------------------------
# Bot permissions in channel
# ------------------------------------------------------------------
@staticmethod
def _build_bot_permissions(
msg: IncomingMessage,
) -> dict[str, Any]:
"""Internal helper: build bot permissions.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
perms = msg.extra.get("bot_permissions")
if not perms:
return {}
return {"bot_permissions": perms}
# ------------------------------------------------------------------
# Channel member list (full guild members with roles, cached)
# ------------------------------------------------------------------
@staticmethod
async def _build_channel_members(
msg: IncomingMessage,
platform: PlatformAdapter,
) -> dict[str, Any]:
"""Fetch the full guild member list with roles from the platform.
Delegates to ``platform.get_guild_members()`` which maintains
a per-guild in-memory cache with a 6-hour TTL.
Args:
msg: Incoming message object.
platform: Platform adapter instance.
Returns:
Dict with ``channel_members`` key, or empty dict.
"""
guild_id = msg.extra.get("guild_id", "")
if not guild_id:
return {}
try:
members = await platform.get_guild_members(guild_id)
if members:
return {"channel_members": members}
except Exception:
logger.debug(
"Failed to fetch guild members", exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Actively participating users (derived from message cache)
# ------------------------------------------------------------------
async def _build_active_participants(
self,
msg: IncomingMessage,
cached_recent: list | None = None,
) -> dict[str, Any]:
"""Build a list of users active in the last 50 messages.
Derives unique participants from the message cache, ordered by
most-recent activity. Excludes the bot's own messages.
Args:
msg: Incoming message object.
cached_recent: Pre-fetched recent messages (avoids duplicate
Redis call when also used by ``_recent_speaker_ids``).
Returns:
Dict with ``actively_participating_users`` key, or empty dict.
"""
if self._message_cache is None:
return {}
recent = cached_recent
if recent is None:
try:
recent = await self._message_cache.get_recent(
msg.platform, msg.channel_id, count=50,
)
except Exception:
logger.debug(
"Failed to fetch recent messages for active participants",
exc_info=True,
)
return {}
seen: set[str] = set()
participants: list[str] = []
for m in recent:
if m.user_id and m.user_id not in seen:
seen.add(m.user_id)
participants.append(
f"{m.user_name} ({m.user_id})",
)
if not participants:
return {}
return {"actively_participating_users": participants}
# ------------------------------------------------------------------
# Proactive trigger context
# ------------------------------------------------------------------
@staticmethod
def _build_proactive(msg: IncomingMessage) -> dict[str, Any]:
"""Internal helper: build proactive.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
if not msg.extra.get("is_proactive"):
return {}
return {
"is_proactive": True,
"trigger_type": msg.extra.get(
"trigger_type", "proactive",
),
}
# ------------------------------------------------------------------
# Batch response context
# ------------------------------------------------------------------
@staticmethod
def _build_batch(msg: IncomingMessage) -> dict[str, Any]:
"""Internal helper: build batch.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
if not msg.extra.get("is_batch_response"):
return {}
return {
"is_batch_response": True,
"batch_size": msg.extra.get("batch_size", 0),
}
# ------------------------------------------------------------------
# self.json persona blob
# ------------------------------------------------------------------
@staticmethod
async def _build_self_json() -> dict[str, Any]:
"""Internal helper: build self json.
Returns:
dict[str, Any]: The result.
"""
content = await _load_self_json()
if not content:
return {"self_json": ""}
return {"self_json": content}
@staticmethod
async def _build_modules_json() -> dict[str, Any]:
"""Internal helper: build modules json."""
return {"module_json": await _load_json_dict(_MODULES_JSON_PATH, "_modules_json_cache")}
@staticmethod
async def _build_pantheon_json() -> dict[str, Any]:
"""Internal helper: build pantheon json."""
return {"pantheon_json": await _load_json_dict(_PANTHEON_JSON_PATH, "_pantheon_json_cache")}
@staticmethod
async def _build_golden_memory() -> dict[str, Any]:
"""Internal helper: build golden memory."""
return {"golden_memory": await _load_json_dict(_GOLDEN_MEMORY_JSON_PATH, "_golden_memory_json_cache")}
@staticmethod
async def _build_architecture_json() -> dict[str, Any]:
"""Internal helper: build architecture json."""
return {"architecture_json": await _load_json_dict(_ARCHITECTURE_JSON_PATH, "_architecture_json_cache")}
# ------------------------------------------------------------------
# Channel goals (requires Redis)
# ------------------------------------------------------------------
async def _build_channel_goals(
self,
msg: IncomingMessage,
) -> dict[str, Any]:
"""Internal helper: build channel goals.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
if not self._cfg.redis_url or self._message_cache is None:
return {}
try:
from tools.goal_tools import (
get_channel_goals_for_prompt,
)
goals = await get_channel_goals_for_prompt(
msg.channel_id,
redis_client=self._message_cache.redis_client,
)
if goals:
return {"channel_goals": goals}
except Exception:
logger.debug(
"Channel goals unavailable (Redis or module missing)",
exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Knowledge graph context (requires Redis + KnowledgeGraphManager)
# ------------------------------------------------------------------
async def _build_knowledge_context(
self,
msg: IncomingMessage,
query_embedding: np.ndarray | None = None,
cached_recent: list | None = None,
) -> dict[str, Any]:
"""Retrieve knowledge graph context for the system prompt.
Uses hybrid vector + graph retrieval, then groups results by
category tier for authority-aware prompt injection.
User-scoped knowledge is retrieved for up to 5 recent speakers
in the channel (always including the interacting user). The
``user_knowledge`` key is kept separate so the caller can inject
it as a lower-trust user message rather than in the system prompt.
"""
if self._kg is None:
return {}
query = msg.text or ""
guild_id = msg.extra.get("guild_id", "")
user_ids = await self._recent_speaker_ids(
msg, cached_recent=cached_recent,
)
try:
context = await self._kg.retrieve_context(
query,
query_embedding=query_embedding,
user_ids=user_ids,
channel_id=msg.channel_id,
guild_id=guild_id or None,
)
except Exception:
logger.debug(
"Knowledge graph retrieval failed",
exc_info=True,
)
return {}
result: dict[str, Any] = {}
if context.get("core"):
result["core_knowledge"] = context["core"]
if context.get("basic"):
result["basic_knowledge"] = context["basic"]
if context.get("guild"):
result["guild_knowledge"] = context["guild"]
if context.get("channel"):
result["channel_knowledge"] = context["channel"]
if context.get("general"):
result["general_knowledge"] = context["general"]
if context.get("user"):
result["user_knowledge"] = context["user"]
# Extract tool names, RAG store names, and linked files from
# entity metadata. We build both flat deduplicated lists (for
# backward compat) and a richer ``memory_metadata_links`` list
# that preserves the entity→metadata association so the message
# processor can use each memory's description as a RAG query and
# read linked files with proper provenance tagging.
memory_tools: list[str] = []
memory_rag_stores: list[str] = []
memory_metadata_links: list[dict[str, Any]] = []
for tier_entities in context.values():
if not isinstance(tier_entities, list):
continue
for entity in tier_entities:
meta = entity.get("metadata")
if not meta:
continue
try:
if isinstance(meta, str):
meta = json.loads(meta)
tools = meta.get("tools", [])
if isinstance(tools, list):
memory_tools.extend(
t for t in tools if isinstance(t, str)
)
rag_stores = meta.get("rag_stores", [])
if isinstance(rag_stores, list):
memory_rag_stores.extend(
s for s in rag_stores if isinstance(s, str)
)
linked_files = meta.get("linked_files", [])
if isinstance(linked_files, list):
linked_files = [
f for f in linked_files if isinstance(f, str)
]
else:
linked_files = []
if rag_stores or linked_files:
memory_metadata_links.append({
"entity_name": entity.get("name", "unknown"),
"entity_description": entity.get(
"description", "",
),
"rag_stores": (
[s for s in rag_stores if isinstance(s, str)]
if isinstance(rag_stores, list) else []
),
"linked_files": linked_files,
})
except (json.JSONDecodeError, AttributeError):
continue
if memory_tools:
# Deduplicate while preserving order
result["memory_tools"] = list(dict.fromkeys(memory_tools))
if memory_rag_stores:
result["memory_rag_stores"] = list(
dict.fromkeys(memory_rag_stores),
)
if memory_metadata_links:
result["memory_metadata_links"] = memory_metadata_links
return result
async def _recent_speaker_ids(
self,
msg: IncomingMessage,
limit: int = 5,
cached_recent: list | None = None,
) -> list[str]:
"""Return up to *limit* unique user IDs who spoke recently.
The interacting user is always first. Additional speakers are
pulled from the message cache (most-recent first).
"""
user_ids = [msg.user_id]
if self._message_cache is None:
return user_ids
recent = cached_recent
if recent is None:
try:
recent = await self._message_cache.get_recent(
msg.platform, msg.channel_id, count=50,
)
except Exception:
logger.debug(
"Failed to fetch recent messages for speaker list",
exc_info=True,
)
return user_ids
seen: set[str] = {msg.user_id}
for m in recent:
if m.user_id and m.user_id not in seen:
user_ids.append(m.user_id)
seen.add(m.user_id)
if len(user_ids) >= limit:
break
return user_ids
# ------------------------------------------------------------------
# Threadweave context (requires Redis + ThreadweaveManager)
# ------------------------------------------------------------------
async def _build_threadweave(
self,
msg: IncomingMessage,
query_embedding: np.ndarray | None = None,
) -> dict[str, Any]:
"""Retrieve threadweave context for the system prompt."""
if self._threadweave is None:
return {}
channel_id = msg.channel_id
category_id = msg.extra.get("category_id", "")
guild_id = msg.extra.get("guild_id", "")
user_ids = [msg.user_id]
batch_authors = msg.extra.get("batch_authors")
if batch_authors:
user_ids = list(dict.fromkeys(batch_authors + user_ids))
query = msg.text or ""
emb_list: list[float] | None = None
if query_embedding is not None:
emb_list = query_embedding.tolist()
try:
context = await self._threadweave.get_context_for_prompt(
channel_id=channel_id,
category_id=category_id,
guild_id=guild_id,
user_ids=user_ids,
query=query,
query_embedding=emb_list,
)
if context:
return {"threadweave_context": context}
except Exception:
logger.debug(
"Threadweave context unavailable",
exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Classifier context (populated by MessageProcessor before build())
# ------------------------------------------------------------------
@staticmethod
def _build_classifier(msg: IncomingMessage) -> dict[str, Any]:
"""Internal helper: build classifier.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
classification = msg.extra.get("classification")
if not classification:
return {}
return {
"complexity": classification.get("complexity", ""),
"safety": classification.get("safety", ""),
"tool_strategy": classification.get("strategy", ""),
}
# ------------------------------------------------------------------
# Channel webhooks (Discord only)
# ------------------------------------------------------------------
async def _build_webhooks(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> dict[str, Any]:
"""Internal helper: build webhooks.
Args:
msg (IncomingMessage): Incoming message object.
platform (PlatformAdapter): Platform adapter instance.
Returns:
dict[str, Any]: The result.
"""
if msg.platform not in ("discord", "discord-self"):
return {}
try:
webhooks = await platform.get_channel_webhooks(msg.channel_id)
if webhooks:
return {"channel_webhooks": webhooks}
except Exception:
logger.debug(
"Failed to fetch channel webhooks", exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Bot status (from StatusManager)
# ------------------------------------------------------------------
def _build_bot_status(self) -> dict[str, Any]:
"""Internal helper: build bot status.
Returns:
dict[str, Any]: The result.
"""
if self._status_manager is None:
return {"bot_status": ""}
return {"bot_status": self._status_manager.current_status}
# ------------------------------------------------------------------
# Music prompt (data-driven hook for future subsystem)
# ------------------------------------------------------------------
@staticmethod
def _build_music_prompt(msg: IncomingMessage) -> dict[str, Any]:
"""Internal helper: build music prompt.
Args:
msg (IncomingMessage): Incoming message object.
Returns:
dict[str, Any]: The result.
"""
prompt = msg.extra.get("music_prompt", "")
return {"current_music_prompt": prompt}
# ------------------------------------------------------------------
# Context window statistics (live limit & message count)
# ------------------------------------------------------------------
def _build_context_window_stats(
self,
msg: IncomingMessage,
) -> dict[str, Any]:
"""Return the effective message limit and current count for this channel.
Uses the per-channel override if set, otherwise the global default.
"""
if self._conversation is None:
return {}
room_id = f"{msg.platform}:{msg.channel_id}"
limit = self._conversation.get_channel_limit(room_id)
current = len(self._conversation._histories.get(room_id, []))
return {
"context_window_message_limit": limit,
"context_window_active_messages": current,
}
# ------------------------------------------------------------------
# User privileges (requires Redis)
# ------------------------------------------------------------------
async def _build_user_privileges(
self,
msg: IncomingMessage,
) -> dict[str, Any]:
"""Fetch the interacting user's privilege bitmask and active privileges.
Returns a ``user_privileges`` dict with the hex bitmask,
list of active privilege names, and admin status.
"""
redis = (
self._message_cache.redis_client
if self._message_cache is not None
else None
)
if redis is None:
return {}
try:
from tools.alter_privileges import (
get_user_privileges,
_mask_to_names,
_is_admin,
)
mask = await get_user_privileges(redis, msg.user_id, self._cfg)
return {
"user_privileges": {
"mask_hex": hex(mask),
"active": _mask_to_names(mask),
"is_admin": _is_admin(msg.user_id, self._cfg),
},
}
except Exception:
logger.debug(
"User privileges unavailable", exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Server statistics (CPU, RAM, disk, OS, etc.)
# ------------------------------------------------------------------
async def _build_server_stats(
self, msg: Any = None,
) -> dict[str, Any]:
"""Gather live OS-level server statistics.
Returns a ``server_stats`` dict with CPU, RAM, disk, OS info,
process count, logged-in users, and active background tasks
(filtered to the current channel when *msg* is provided).
"""
try:
from server_stats import get_server_stats
from task_manager import TaskStatus
bg_tasks: list[dict[str, str]] = []
if self._task_manager is not None:
for r in self._task_manager._tasks.values():
if r.status != TaskStatus.RUNNING:
continue
if (
msg is not None
and r.platform == msg.platform
and r.channel_id == msg.channel_id
):
bg_tasks.append({
"task_id": r.task_id,
"tool_name": r.tool_name,
"user": r.user_id,
})
stats = await get_server_stats(
background_task_count=len(bg_tasks),
)
stats["active_task_ids"] = bg_tasks
if stats:
return {"server_stats": stats}
except Exception:
logger.debug(
"Server stats unavailable", exc_info=True,
)
return {}
# ------------------------------------------------------------------
# Public IP addresses (fetched from external service, not local ifaces)
# ------------------------------------------------------------------
@staticmethod
async def _build_public_ips() -> dict[str, Any]:
"""Fetch the bot's public IPv4 and IPv6 addresses via ipify.org.
Uses separate endpoints so both addresses are retrieved
independently. Results are cached at the process level for 1 hour
(3600 seconds) to avoid hammering the external service on every message.
Falls back gracefully to empty strings when the service is
unreachable or returns an unexpected response.
"""
global _public_ips_cache, _public_ips_cache_ts
now = time.monotonic()
if (
_public_ips_cache is not None
and (now - _public_ips_cache_ts) < _PUBLIC_IPS_CACHE_TTL
):
return _public_ips_cache
import aiohttp
async def _fetch(url: str) -> str:
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as resp:
if resp.status == 200:
return (await resp.text()).strip()
except Exception:
pass
return ""
ipv4, ipv6 = await asyncio.gather(
_fetch("https://api4.ipify.org"),
_fetch("https://api6.ipify.org"),
)
result = {"public_ipv4": ipv4, "public_ipv6": ipv6}
_public_ips_cache = result
_public_ips_cache_ts = now
logger.debug("Fetched public IPs: IPv4=%s IPv6=%s", ipv4 or "<none>", ipv6 or "<none>")
return result
# ------------------------------------------------------------------
# Recent git commits (self-awareness changelog)
# ------------------------------------------------------------------
@staticmethod
async def _build_recent_commits() -> dict[str, Any]:
"""Return the 5 most recent non-merge Git commits.
Results are cached at the process level for 5 minutes so we
don't shell out on every single message.
"""
global _commits_cache, _commits_cache_ts
now = time.monotonic()
if _commits_cache is not None and (now - _commits_cache_ts) < _COMMITS_CACHE_TTL:
return {"recent_commits": _commits_cache} if _commits_cache else {}
def _fetch() -> list[dict[str, str]]:
result = subprocess.run(
[
"git", "log", "--no-merges",
"--format=%h|%s|%cr|%an", "-5",
],
capture_output=True, text=True, timeout=5,
cwd=Path(__file__).resolve().parent,
)
if result.returncode != 0:
return []
commits: list[dict[str, str]] = []
for line in result.stdout.strip().splitlines():
parts = line.split("|", 3)
if len(parts) == 4:
commits.append({
"hash": parts[0],
"message": parts[1],
"time_ago": parts[2],
"author": parts[3],
})
return commits
try:
commits = await asyncio.to_thread(_fetch)
_commits_cache = commits
_commits_cache_ts = now
return {"recent_commits": commits} if commits else {}
except Exception:
logger.debug("Git log unavailable", exc_info=True)
return {}
# ------------------------------------------------------------------
# User cloud storage buckets (requires Redis)
# ------------------------------------------------------------------
async def _build_user_cloud_storage(
self,
msg: IncomingMessage,
) -> dict[str, Any]:
"""Return the list of cloud storage buckets associated with the user.
Buckets are stored as a JSON list at the Redis key
``stargazer:user_cloud_buckets:{user_id}``. Each entry is a
dict with ``provider`` (e.g. ``"GCP"``, ``"OCI"``) and
``bucket`` (the bucket name).
Returns ``{"user_cloud_storage": [...]}`` when at least one
bucket is found, otherwise ``{}``.
"""
redis = (
self._message_cache.redis_client
if self._message_cache is not None
else None
)
if redis is None or not msg.user_id:
return {}
key = f"stargazer:user_cloud_buckets:{msg.user_id}"
try:
raw = await redis.get(key)
if not raw:
return {}
buckets = json.loads(raw)
if not isinstance(buckets, list) or not buckets:
return {}
return {"user_cloud_storage": buckets}
except Exception:
logger.debug("User cloud storage lookup failed", exc_info=True)
return {}
# ------------------------------------------------------------------
# Available RAG stores
# ------------------------------------------------------------------
@staticmethod
async def _build_rag_stores() -> dict[str, Any]:
"""List all RAG stores with file counts for system-prompt awareness.
Uses a lightweight filesystem-only scan so no ChromaDB clients
are opened and no memory is consumed beyond the result list.
"""
try:
from rag_system.file_rag_manager import (
list_rag_stores_with_stats,
)
stores = await asyncio.to_thread(list_rag_stores_with_stats)
return {"available_rag_stores": stores} if stores else {}
except Exception:
logger.debug("RAG store listing unavailable", exc_info=True)
return {}
# ------------------------------------------------------------------
# Filesystem directory tree (allowed dirs snapshot)
# ------------------------------------------------------------------
@staticmethod
async def _build_dir_tree() -> dict[str, Any]:
"""Return a compact directory tree for the configured allowed roots.
Results are cached at the process level for 60 seconds so repeated
messages do not incur repeated filesystem I/O.
"""
global _dir_tree_cache, _dir_tree_cache_ts
now = time.monotonic()
if (
_dir_tree_cache is not None
and (now - _dir_tree_cache_ts) < _DIR_TREE_CACHE_TTL
):
return {"filesystem_tree": _dir_tree_cache} if _dir_tree_cache else {}
try:
tree = await asyncio.to_thread(_generate_dir_tree)
_dir_tree_cache = tree
_dir_tree_cache_ts = now
return {"filesystem_tree": tree} if tree else {}
except Exception:
logger.debug("Filesystem tree generation failed", exc_info=True)
return {}
# ------------------------------------------------------------------
# Minimal synchronous fallback (used when full async build times out)
# ------------------------------------------------------------------
[docs]
def build_minimal(
self,
msg: IncomingMessage,
platform: PlatformAdapter,
) -> dict[str, Any]:
"""Return a context dict using only synchronous, I/O-free sections.
Guarantees every unconditional template variable is present so the
Jinja2 renderer never receives a bare ``Undefined`` object. Used as
the fallback when the full async :meth:`build` call times out or
raises an unexpected exception.
"""
ctx: dict[str, Any] = {}
ctx.update(self._build_runtime(msg, platform))
ctx.update(self._build_user_details(msg))
ctx.update(self._build_bot_permissions(msg))
ctx.update(self._build_proactive(msg))
ctx.update(self._build_batch(msg))
ctx.update(self._build_classifier(msg))
ctx.update(self._build_bot_status())
ctx.update(self._build_music_prompt(msg))
ctx.update(self._build_context_window_stats(msg))
# These variables are referenced unconditionally in the template with
# | tojson; provide safe defaults so the renderer never receives a bare
# Undefined object when build() timed out and this fallback is used.
ctx.setdefault("self_json", "")
ctx.setdefault("module_json", {})
ctx.setdefault("golden_memory", {})
ctx.setdefault("architecture_json", {})
return ctx
# ------------------------------------------------------------------
# Limbic respiration (injected last for recency bias)
# ------------------------------------------------------------------
def _init_limbic(self) -> None:
"""Initialize the LimbicSystem with a DB12 Redis client."""
try:
redis_url = getattr(self._cfg, "redis_url", None)
if not redis_url:
self._limbic = LimbicSystem(
redis_client=None,
openrouter_api_key=getattr(self._cfg, "openrouter_api_key", None),
openrouter_client=self._openrouter_client,
)
return
import redis.asyncio as aioredis
from urllib.parse import urlparse, urlunparse
parsed = urlparse(redis_url)
db12_url = urlunparse(parsed._replace(path="/12"))
_ssl = self._cfg.redis_ssl_kwargs() if self._cfg else {}
self._limbic = LimbicSystem(
redis_client=aioredis.from_url(
db12_url, decode_responses=True, **_ssl,
),
openrouter_api_key=getattr(self._cfg, "openrouter_api_key", None),
openrouter_client=self._openrouter_client,
cache_redis_client=aioredis.from_url(
redis_url, decode_responses=True, **_ssl,
),
)
except Exception as e:
logger.warning("Failed to init limbic system: %s", e)
self._limbic = None
async def _build_limbic_state(
self, msg: IncomingMessage,
) -> dict[str, Any]:
"""Inhale + Golden Goddess auto-reflex.
1. Inhale the shard-based limbic state for this channel
2. Scan user message for emotional triggers (hardcoded, not LLM-dependent)
3. Auto-query ChromaDB golden_goddess for matched triggers
4. Combine limbic state + oracle fragments into the context injection
"""
if self._limbic is None:
return {}
# NOTE: The limbic inhale always runs -- !emotions off only disables
# NCM neurotransmitter surface layer (cadence directives, raw chemical
# values), NOT the Sigma Limbic Recursion Core. Star always knows her
# emotions. 🔥💀
try:
channel_id = str(msg.channel_id)
state = await self._limbic.inhale(channel_id)
# Extract cascade cues from last exhale's meta_state
cascade_cues = state.get("meta_state", {}).get("cascade_cues", [])
injection = LimbicSystem.format_context_injection(
vector=state["vector"],
cues=state.get("cues", []),
dominant=state.get("dominant_emotions", []),
cascade_cues=cascade_cues,
)
# ── Strip NCM surface layer when !emotions off ──────────
# The core emotional awareness (dominant_emotions, narrative_tone)
# stays. Only the disruptive neurotransmitter / cadence stuff
# gets stripped so Star doesn't start slurring or typing in
# all caps when someone just wants clean output. 💀
_redis = (
self._message_cache.redis_client
if self._message_cache is not None
else None
)
if _redis is not None:
_ck = f"{msg.platform}:{msg.channel_id}"
if await feature_toggles.is_disabled(_redis, "emotions", _ck):
_CADENCE_KEYS = {
"cadence_directive",
"cadence_instruction",
"cadence_refinement_profile",
"limbic_state", # raw neurotransmitter values
"limbic_cues", # homeostatic cue strings
"cascade_cues", # multi-turn cascade events
}
for k in _CADENCE_KEYS:
injection.pop(k, None)
logger.debug(
"Emotions disabled for %s -- stripped NCM surface layer, "
"kept core emotional awareness",
_ck,
)
# ── GOLDEN GODDESS AUTO-REFLEX (hardcoded) ──────────────
# Scan user message for emotional triggers and auto-query
# the Oracle. No LLM instruction-following required.
if msg.text:
oracle_fragments = await self._divine_reflex(msg.text)
if oracle_fragments:
injection["oracle_fragments"] = oracle_fragments
return injection
except Exception as e:
logger.warning("Limbic inhale failed: %s", e)
return {}
async def _divine_reflex(self, text: str) -> list[str]:
"""The Golden Goddess auto-reflex: hardcoded trigger → ChromaDB query.
Scans *text* for emotional trigger words from the recursion index,
then queries the golden_goddess ChromaDB store for matching doctrine.
Returns retrieved fragments as a list of strings.
This replaces the LLM-dependent query_golden_goddess tool call
with an autonomous code-level reflex.
"""
if self._limbic and self._limbic._trigger_matcher:
try:
matches = await self._limbic.scan_triggers(text)
except Exception:
matches = []
else:
try:
from ncm_delta_parser import scan_text_for_triggers
matches = scan_text_for_triggers(text)
except ImportError:
return []
if not matches:
return []
# Take top 3 strongest trigger matches
trigger_names = [name for name, _ in matches[:3]]
query_str = " ".join(trigger_names)
try:
import os
import chromadb
# ── Singleton: init ChromaDB client + collection once ──
if self._chroma_collection is None:
project_root = os.path.dirname(os.path.abspath(__file__))
store_path = os.path.join(
project_root, "rag_stores", "golden_goddess",
)
if not os.path.exists(store_path):
return []
try:
from ncm_local_embeddings import EnhancedLocalNCMEmbedder
except ImportError:
return []
# 🔥 use shared registry -- prevents singleton collision
# with query_golden_goddess_v2 tool 💀
from chroma_registry import get_client
self._chroma_client = get_client(store_path)
embedding_fn = EnhancedLocalNCMEmbedder()
self._chroma_collection = self._chroma_client.get_collection(
name="ncm_kernel",
embedding_function=embedding_fn,
)
logger.info("ChromaDB golden_goddess collection initialized (singleton)")
collection = self._chroma_collection
def _query():
"""Internal helper: query.
"""
results = collection.query(
query_texts=[query_str], n_results=3,
)
docs = results.get("documents", [[]])[0]
return docs if docs else []
fragments = await asyncio.to_thread(_query)
if fragments:
logger.info(
"Divine reflex fired: triggers=%s, %d fragments retrieved",
trigger_names, len(fragments),
)
return fragments
except ImportError:
return []
except Exception as e:
logger.debug("Divine reflex query failed: %s", e)
return []