Source code for config

"""Bot configuration loaded from config.yaml with environment variable overrides.

Supports a ``platforms`` list so multiple chat platforms (Matrix, Discord, ...)
can be configured independently alongside the shared LLM / web settings.
"""

from __future__ import annotations

import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import yaml

from user_llm_config import sanitize_llm_http_url

import base64
import logging
import socket
from datetime import datetime
from typing import Literal, Optional
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from observability import observability

logger = logging.getLogger(__name__)


[docs] class RedisSSLConfig(BaseModel): enabled: bool = True host: str = Field(..., min_length=1) port: int = Field(6379, ge=1, le=65535) ssl_certfile: str = Field(..., description="Path to client mTLS certificate") ssl_keyfile: str = Field(..., description="Path to client mTLS private key") ssl_ca_certs: str = Field(..., description="Path to trusted Redis CA root") ssl_cert_reqs: Literal["required", "none"] = "required"
[docs] @field_validator("ssl_certfile", "ssl_keyfile", "ssl_ca_certs") @classmethod def validate_paths_exist(cls, v: str) -> str: """Validate that a Redis mTLS credential path points at a real file. Pydantic field validator wired to the ``ssl_certfile``, ``ssl_keyfile``, and ``ssl_ca_certs`` fields of :class:`RedisSSLConfig`. It wraps the supplied path in :class:`pathlib.Path` and rejects the value unless that path resolves to an existing regular file, so a missing client certificate, private key, or CA root is caught at config-load time rather than surfacing later as an opaque TLS handshake failure when the Redis client connects. Pydantic invokes this automatically during model construction; there are no direct internal callers. It performs a single filesystem ``stat`` (via :meth:`Path.is_file`) and touches no Redis/KG/LLM/HTTP collaborator. Args: v (str): The candidate filesystem path for a cryptographic credential, as provided in YAML or the environment. Returns: str: The same path *v*, unchanged, when it names an existing file. Raises: ValueError: If *v* does not point at an existing regular file; pydantic surfaces this as a validation error. """ path = Path(v) if not path.is_file(): raise ValueError(f"Cryptographic credential path does not exist: {v}") return v
[docs] class BotSettings(BaseSettings): model_config = SettingsConfigDict( env_file=".env", env_prefix="STAR_", extra="ignore" ) bot_token: str = Field(..., min_length=10) admin_user_ids: list[str] = Field(default_factory=list) dangerous_command_warning_enabled: bool = True dangerous_command_similarity_threshold: float = Field(0.35, ge=0.0, le=1.0) redis: RedisSSLConfig
[docs] class RuntimeState: wallet_key_active: bool = False wallet_master_key: Optional[bytes] = None
[docs] def load_wallet_key_non_halting( state: RuntimeState, platform_type: str = "eth" ) -> None: """Load a wallet master key from the environment without aborting startup. Reads the per-platform ``STAR_{PLATFORM}_WALLET_MASTER_KEY`` environment variable, base64-decodes it, validates that it is exactly 32 bytes (256-bit), and stashes the raw bytes on *state* so wallet features can come online. The function deliberately never raises: a missing or malformed key is treated as a degraded-but-bootable condition rather than a fatal error, so the rest of the bot can still serve non-wallet traffic. On any failure it logs a loud ASCII-bordered ``CRITICAL`` warning, fires :func:`observability.alert` (tagging the hostname, ``wallet_key_utils`` component, platform, and UTC timestamp) so the missing-secret condition is visible in monitoring, and clears both ``wallet_key_active`` and ``wallet_master_key`` on *state*. It reads ``os.environ`` and the host name but touches no Redis/KG/LLM/HTTP collaborator. Called by the boot path and exercised directly in ``tests/test_boot_non_halting.py``. Args: state (RuntimeState): Mutable runtime container whose ``wallet_master_key`` and ``wallet_key_active`` fields are set in place to reflect the outcome. platform_type (str): Chain/platform selector (e.g. ``"eth"``) used to build the environment variable name; uppercased into ``STAR_{PLATFORM}_WALLET_MASTER_KEY``. Returns: None: All results are communicated by mutating *state*. """ env_var_name = f"STAR_{platform_type.upper()}_WALLET_MASTER_KEY" b64_key = os.environ.get(env_var_name) if not b64_key: border = "#" * 80 logger.error( f"\n{border}\n" f"SECURITY CRITICAL WARNING: Missing environment variable '{env_var_name}'.\n" "Stargazer v3 will boot in DEGRADED STATE. Wallet features are completely deactivated.\n" f"{border}" ) observability.alert( "CRITICAL", "Wallet master cryptographic keys are missing. Startup proceeding in degraded mode.", metadata={ "hostname": socket.gethostname(), "component": "wallet_key_utils", "platform": platform_type, "timestamp": datetime.utcnow().isoformat() + "Z", }, ) state.wallet_key_active = False state.wallet_master_key = None return try: raw_key = base64.urlsafe_b64decode(b64_key.encode()) if len(raw_key) != 32: raise ValueError( f"Invalid key length: must be exactly 32 bytes (256-bit). Got {len(raw_key)} bytes." ) state.wallet_master_key = raw_key state.wallet_key_active = True logger.info("Successfully loaded wallet master key. Wallet services active.") except Exception as e: logger.error( "Failed to decode cryptographic master key: %s", str(e), exc_info=True ) observability.alert( "CRITICAL", "Wallet master key decoding failure. Wallet services deactivated.", metadata={ "hostname": socket.gethostname(), "component": "wallet_key_utils", "platform": platform_type, "timestamp": datetime.utcnow().isoformat() + "Z", "error": str(e), }, ) state.wallet_key_active = False state.wallet_master_key = None
# Default channels where the entire NCM stack (inhale/exhale/tools/cadence) is off. # Aesir-Hall (Discord). DEFAULT_NCM_FULLY_DISABLED_CHANNELS: frozenset[str] = frozenset( { "discord:1361023131107590184", } ) DEFAULT_KG_FULL_USER_MEMORY_IDS: tuple[str, ...] = ("281484046150926336",) # Default shell-tool allowlist when ``shell_authorized_user_ids`` is omitted from YAML. _DEFAULT_SHELL_AUTHORIZED_USER_IDS: tuple[str, ...] = ( "82303438955753472", "1063654597937336372", "517538246788513821", "389067553940439053", "829047047633764402", ) def _deep_merge_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]: """Recursively merge *overlay* on top of *base*, returning a new dict. Produces a deep union of two plain mapping trees: where both sides hold a nested dict under the same key the function recurses, and for every other key the ``overlay`` value wins outright. Neither input is mutated — a fresh ``dict`` is built at each level — so callers can reuse the originals safely. Used purely for config layering: :meth:`Config.load` calls it to fold the optional ``data/skills_runtime.yaml`` overlay onto the base ``config.yaml`` data, and it recurses into itself for nested sections. It is also exercised directly by ``tests/test_config_merge.py``. Pure in-memory computation with no Redis/KG/LLM/HTTP/filesystem side effects. Args: base (dict[str, Any]): The lower-priority mapping (e.g. ``config.yaml``). overlay (dict[str, Any]): The higher-priority mapping whose values take precedence on conflicts. Returns: dict[str, Any]: A new merged mapping; values from *overlay* override those in *base* except where both are dicts, which are merged in turn. """ out = dict(base) for key, val in overlay.items(): if key in out and isinstance(out[key], dict) and isinstance(val, dict): out[key] = _deep_merge_dict(out[key], val) else: out[key] = val return out
[docs] @dataclass class PlatformConfig: """Configuration for a single chat platform (Matrix, Discord, webchat, ...). One entry in :attr:`Config.platforms`, modelling a single platform binding so the bot can run several chat surfaces side by side. The well-known ``type`` and ``enabled`` fields are promoted to attributes while every other YAML key for that platform (token, homeserver, prefix, etc.) is kept in the free-form :attr:`settings` mapping and read back through :meth:`get`. Instances are constructed in :meth:`Config.load` from the YAML ``platforms`` list (and synthesised for legacy Matrix fields / a ``DISCORD_TOKEN`` env var), and additional entries are appended at runtime by ``web/platforms_api.py``. Consumers such as ``platforms/factory.py``, ``gateway_main.py``, and ``build_kg.py`` iterate the list to spin up the concrete platform clients. A plain dataclass holding configuration only — it performs no I/O. """ type: str = "" """Platform identifier: ``"matrix"``, ``"discord"``, etc.""" enabled: bool = True """Whether this platform should be started.""" # Arbitrary platform-specific keys settings: dict[str, Any] = field(default_factory=dict) """All remaining keys from the YAML block are stored here."""
[docs] def get(self, key: str, default: Any = None) -> Any: """Look up a platform-specific setting, falling back to *default*. Thin convenience wrapper over the free-form :attr:`settings` dict so callers can read per-platform options (token, homeserver, prefix, ...) without reaching into the underlying mapping. Exists because the well-known ``type`` and ``enabled`` keys are promoted to dataclass attributes while everything else lands in ``settings``. Called heavily by ``platforms/factory.py`` when building the concrete Matrix/Discord clients (e.g. ``pcfg.get("token", "")``) and by other platform wiring; it only reads the in-memory dict and performs no I/O. Args: key (str): Name of the platform setting to read from ``settings``. default (Any): Value returned when *key* is absent. Defaults to ``None``. Returns: Any: The stored setting for *key*, or *default* when not present. """ return self.settings.get(key, default)
[docs] @dataclass class Config: # --- Shared LLM settings --- """Central runtime configuration for the whole bot, loaded from YAML and env. The single source of truth for every tunable in the system — LLM/OpenRouter settings, Redis/Sentinel and mTLS wiring, the pgvector store, the knowledge graph, Limbic Anchoring, persona preferences, the web UI, OAuth, per-platform bindings, and dozens of feature toggles. It is built once via :meth:`Config.load`, which reads ``config.yaml`` (plus an optional skills overlay), applies environment-variable overrides, and normalises paths and lists into the typed fields declared here. A single loaded instance is shared across all five microservices (gateway, inference, agents, consolidation, web); each service calls :meth:`Config.load` at startup and threads the object through to tools via the ``ToolContext``. Beyond holding data it exposes a few behavioural helpers — :meth:`redis_ssl_kwargs`, :meth:`redis_resilience_kwargs`, and :meth:`build_async_redis_client` — that turn these fields into live ``redis.asyncio`` clients, and :meth:`__post_init__` syncs the global ``ego_ablation`` flag. A dataclass; construction itself performs no network I/O beyond the path canonicalisation done in :meth:`load`. """ api_key: str = "" gemini_api_key: str = "" llm_base_url: str = "http://localhost:3000/openai" model: str = "x-ai/grok-4.1-fast" temperature: float = 0.7 max_tokens: int = 60000 # Nucleus sampling (OpenAI-compatible chat-completions); default 0.99. top_p: float = 0.99 # httpx.AsyncClient timeouts for OpenRouter chat-completions (see openrouter_client.OpenRouterClient) openrouter_http_connect_timeout_seconds: float = 10.0 openrouter_http_read_timeout_seconds: float = 1200.0 openrouter_http_write_timeout_seconds: float = 120.0 openrouter_http_pool_timeout_seconds: float = 180.0 system_prompt_file: str = "system_prompt.j2" max_history: int = 100 tools_dir: str = "tools" # --- Dedicated tools-execution microservice (stargazer-tools) --- tools_service_mode: str = "in_process" """How the inference tier executes tools. * ``"in_process"`` (default): run tools locally, exactly as before the split. Safe baseline — the dedicated tools service can be deployed dormant. * ``"remote"``: delegate non-pinned tools to the dedicated ``tools`` service over ``sg:stream:tools``; gateway-pinned tools still go to the gateway and inference-pinned compound tools (``INFERENCE_PINNED_TOOLS``) still run locally. * ``"shadow"``: run remote AND in-process, keep the in-process result, log divergences (rollout validation only).""" tools_exec_timeout: float = 120.0 """Seconds the inference tier waits for a delegated tool result before giving up. Must exceed the TaskManager background timeout so legitimately slow tools are not abandoned early.""" tools_local_fallback: bool = True """When delegation fails (tools service down / timeout), fall back to running the tool in-process for that call. Only meaningful while inference still imports the tool modules (rollout). Set ``False`` for the lean end state, where a delegation failure returns a recoverable error string to the model instead.""" tools_force_in_process: list[str] = field(default_factory=list) """Per-tool kill switch: tool names that must always run in-process on inference regardless of ``tools_service_mode`` (granular rollback).""" tools_require_session_record: bool = True """When True (default), the tools service rejects any delegated request whose ``trace_id`` has no authenticated session record (written by inference) — identity is resolved from that record, never from the request envelope. Set False only as a rollout escape hatch (falls back to envelope identity with a warning); not recommended once privileged tools run remotely.""" # --- Tool permissions (tool_name -> list of allowed user IDs) --- tool_permissions: dict[str, list[str]] = field(default_factory=dict) # --- API keys for external services (Brave Search, Vultr, etc.) --- api_keys: dict[str, Any] = field(default_factory=dict) # --- Prowlarr (Docker *arr stack; indexer search API on the host) --- prowlarr_base_url: str = "" prowlarr_api_key: str = "" # --- Cursor IDE (cursor-agent CLI used by send_cursor_prompt, # import_mcp_tool, and workflow_subagent_tools). Resolution order # in tools/_cursor_api_key.py: per-user `set_user_api_key # service=cursor` → channel/global pool → ``CURSOR_API_KEY`` env → # this field. Never inline the key in tool source — READ_TOOL_CODE # would expose it. cursor_api_key: str = "" # --- Redis message cache --- redis_url: str = "" redis_sentinels: list[str] = field(default_factory=list) redis_sentinel_master: str = "falkordb" redis_tls_cert: str = "" redis_tls_key: str = "" redis_tls_ca: str = "" redis_tls_verify_peer: bool = True # --- Redis failover resilience --- # Number of automatic retries (with exponential backoff) applied to every # Redis command. This lets reads/writes ride through a Sentinel failover # window (master re-election) instead of raising on the first attempt. redis_max_retries: int = 3 # Periodic PING health-check interval (seconds) so stale connections to a # demoted master are detected and recycled. 0 disables. redis_health_check_interval: float = 15.0 # Enable TCP keepalive so dead peers are noticed promptly. redis_socket_keepalive: bool = True # Connection-establishment timeout (seconds). Does NOT affect blocking reads # (XREAD/SUBSCRIBE), so it is safe to set on the shared client. 0 disables. redis_socket_connect_timeout: float = 5.0 # Per-command socket read timeout (seconds). Left at 0 (disabled) by default # because the MessageCache client is shared with blocking stream/pubsub # consumers; a global read timeout would break their long-lived XREAD/listen. redis_socket_timeout: float = 0.0 embedding_model: str = "google/gemini-embedding-001" # --- Deferred embedding batching --- embedding_batch_size: int = 50 embedding_flush_interval: float = 3600.0 # --- Vector store (Postgres + pgvector) --- # Backs all RAG / memory vector storage (see vector_store.py). One DB, # one schema per store, halfvec(3072) + HNSW (L2). A full DSN, if set, # overrides the individual host/port/etc fields. vector_pg_dsn: str = "" vector_pg_host: str = "10.10.0.7" vector_pg_port: int = 5432 vector_pg_database: str = "stargazer_filerag" vector_pg_user: str = "stargazer" vector_pg_password: str = "" vector_pg_sslmode: str = "prefer" vector_pg_min_size: int = 1 vector_pg_max_size: int = 8 # --- Knowledge graph --- kg_extraction_model: str = "gemini-3-flash" kg_max_hops: int = 2 # Per-entity-label vector KNN pool before seed thresholding and cap. kg_seed_top_k: int = 64 kg_seed_limit: int = 15 # FalkorDB cosine scores for this stack are often ~0.35–0.45 for # plausible matches; stricter values (e.g. 0.65) are supported when # your index/embeddings produce higher similarities. kg_seed_similarity_threshold: float = 0.38 # Relax seed similarity floor for category=user when retrieved/stored # ratio is below target (helps heavy-memory users surface more KG facts). kg_seed_dynamic_threshold_enabled: bool = True kg_seed_dynamic_threshold_target_ratio: float = 0.10 kg_seed_dynamic_threshold_min: float = 0.20 kg_seed_dynamic_threshold_min_stored: int = 5 kg_full_user_memory_ids: list[str] = field( default_factory=lambda: list(DEFAULT_KG_FULL_USER_MEMORY_IDS), ) kg_user_seed_min: int = 10 kg_user_candidate_limit: int = 100 kg_lore_candidate_limit: int = 40 kg_lore_seed_min: int = 5 kg_meta_candidate_limit: int = 40 kg_meta_seed_min: int = 5 kg_recent_speaker_limit: int = 10 kg_min_edge_weight: float = 0.0 kg_default_edge_weight: float = 0.8 kg_retrieval_hop_decay: float = 0.8 # Max neighbors returned from graph expansion (0 = unlimited). kg_expansion_neighbor_limit: int = 500 kg_max_context_entities: int = 30 kg_entity_dedup_threshold: float = 0.90 kg_relationship_decay_factor: float = 0.95 kg_per_message_extraction: bool = False kg_min_message_length: int = 100 kg_per_user_extraction_limit: int = 5 kg_extraction_channel_hints: dict[str, str] = field(default_factory=dict) """Optional `platform:channel_id` -> human label for KG extraction prompts.""" # --- Persona preference memory --- persona_preferences_enabled: bool = True """Master toggle for the persona preference memory system.""" persona_pref_extraction_enabled: bool = True """Whether auto-extraction of preferences from bot responses is active.""" persona_pref_extraction_rate_limit_seconds: int = 180 """Minimum seconds between auto-extractions per channel.""" persona_pref_extraction_min_response_length: int = 150 """Skip extraction for responses shorter than this many characters.""" persona_pref_injection_max_count: int = 8 """Maximum number of preferences to inject per inference.""" persona_pref_injection_max_chars: int = 2000 """Character budget for injected preferences per inference.""" persona_pref_base_persona_id: str = "stargazer" """Identifier for the base persona (used when no egregore is active).""" # --- Narrative Subsystem Overrides & Absolute Bypass Lists --- overall_user_id_absolute_override_list: list[str] = field( default_factory=lambda: ["517538246788513821", "82303438955753472", "1063654597937336372"] ) overall_channel_id_absolute_override_list: list[str] = field( default_factory=lambda: ["discord:1424991476668170351"] ) egregores_global_disabled: bool = False loopfield_global_disabled: bool = False ego_ablation_global_disabled: bool = False proactive_global_disabled: bool = False ncm_global_disabled: bool = False flash_mirror_global_disabled: bool = False anamnesis_global_disabled: bool = True cart_lock_global_enabled: bool = False lore_amplifier_global_disabled: bool = False # --- Mementropic (NCM ledger + late fusion + embedding drift on recall) --- mementropic_late_fusion_enabled: bool = True mementropic_semantic_weight: float = 0.65 mementropic_resonance_weight: float = 0.35 mementropic_ledger_max_entries: int = 50_000 mementropic_reconsolidation_enabled: bool = True mementropic_reconsolidation_learning_rate: float = 0.03 mementropic_reconsolidation_max_step: float = 0.25 mementropic_reconsolidation_max_entities: int = 24 # --- Limbic Anchoring & Sunset Processor --- ka_enabled: bool = True """Master toggle for the Limbic Anchoring system.""" # Worker ka_batch_size: int = 20 ka_sleep_interval_seconds: int = 300 ka_max_anchors_per_channel_per_day: int = 300 ka_dlq_retry_limit: int = 3 ka_backpressure_threshold: int = 500 ka_backpressure_recovery: int = 50 ka_batch_min_size: int = 10 """Minimum backlog depth required before a batch is processed. Channels with fewer than this many pending messages are skipped (deferred) until more accumulate, preventing wasteful single-message LLM calls. Default: 10.""" ka_epoch_size: int = 5000 ka_fast_path_prefix: str = "!remember" """Command prefix that triggers the fast-path explicit anchoring route.""" ka_extraction_http_timeout_seconds: float = 360.0 """Read timeout (seconds) for the extraction LLM HTTP call via the local proxy. Large batches with rich context can take 150-250 s on Gemini Flash; 360 s provides a comfortable margin. Raise further if timeouts persist on backpressure-doubled batches (up to ~40 messages).""" ka_bootstrap_mode: str = "full" """Bootstrap mode for new channels: 'latest' (jump to present) or 'full' (start from beginning).""" ka_max_backlog_messages: int = 1000 """Max messages to look back when bootstrapping with 'latest' (e.g. to catch very recent context).""" # Heuristic Gates ka_noise_gate_enabled: bool = True """Toggle for the heuristic noise filtering system. If False, all messages within the daily budget are anchored.""" ka_length_minimum: int = 20 ka_similarity_familiar: float = 0.45 ka_similarity_anomaly: float = 0.15 ka_density_threshold: float = 0.4 ka_tool_rate_limit_per_user_per_hour: int = 3 ka_significant_tools: list = field( default_factory=lambda: [ "search_web", "read_file", "kg_search_entities", "unsandboxed_python_tool", ] ) """Tool whitelist for Tool Gate; defaults to a safe production set.""" # Catch-up Gate — relaxed thresholds applied when status=="catch-up" # (backlog > ka_backpressure_threshold). Wider similarity band and a # higher daily budget allow high-backlog channels to drain faster. ka_catchup_enabled: bool = True """Toggle catch-up gate relaxation. Set False to use normal thresholds always.""" ka_catchup_similarity_familiar: float = 0.55 """Familiar-path threshold during catch-up mode (wider than normal 0.45).""" ka_catchup_similarity_anomaly: float = 0.20 """Anomaly-path threshold during catch-up mode (lower noise band floor).""" ka_catchup_density_threshold: float = 0.40 """Density threshold during catch-up mode.""" ka_catchup_max_anchors_per_day: int = 3000 """Daily anchor budget per channel during catch-up mode.""" # Sunset Processor ka_sunset_interval_seconds: int = 43200 ka_sunset_window_start_days: int = 90 ka_sunset_window_end_days: int = 80 ka_sunset_daily_summaries_per_epoch: int = 10 ka_sunset_token_reduction_threshold: int = 60000 ka_sunset_hydration_similarity: float = 0.92 # Garbage Collector ka_gc_orphan_age_days: int = 95 # Deduplication Worker ka_dedup_enabled: bool = True """Master toggle for the background KG deduplication worker.""" ka_dedup_interval_seconds: int = 300 """How often (seconds) the dedup cycle runs. Default: every 6 hours.""" ka_dedup_semantic_threshold: float = 0.92 """Cosine similarity floor for Phase B (semantic) candidate pairs.""" ka_dedup_structural_batch_size: int = 200 """Max cross-label name collisions resolved per Phase A cycle.""" ka_dedup_semantic_batch_size: int = 50 """Max LLM synthesis calls issued per Phase B cycle.""" ka_dedup_llm_retry_limit: int = 2 """Max LLM retries per candidate pair before the pair is skipped.""" ka_dedup_neighbourhood_limit: int = 10 """Max relationship samples fetched per entity for the synthesis prompt.""" # Organization Worker ka_org_enabled: bool = True """Master toggle for the background KG organization worker.""" ka_org_interval_seconds: int = 600 """How often (seconds) the organization cycle runs. Default: 10 minutes.""" ka_org_hub_degree_threshold: int = 2 """Relationship degree above which a node is considered a hub. Default: 2.""" ka_org_max_hubs_per_cycle: int = 5 """Max number of hubs evaluated per wake cycle.""" ka_org_max_llm_calls_per_cycle: int = 50 """Max LLM calls allowed in a single organization cycle to avoid rate limits.""" ka_org_llm_delay_seconds: float = 0.5 """Sleep delay between LLM calls in the organization loop to avoid rate limits.""" # Accumulation Merge ka_accum_merge_enabled: bool = True """Enable description accumulation and LLM consolidation for same-name+label entity hits.""" ka_accum_merge_threshold: int = 10 """Number of accumulated description segments before LLM consolidation fires. Default: 10.""" ka_accum_merge_llm_retry_limit: int = 2 """Max LLM retries for consolidation before falling back to keeping the accumulated text.""" ka_accum_merge_reembed_interval: int = 3 """Re-embed the entity's accumulated description every N appends (0 = only on consolidation).""" # --- Filesystem / Prompt Context --- dir_tree_extra_roots: list[str] = field(default_factory=list) """Additional absolute filesystem paths to include in the directory tree injected into the system prompt. Set in config.yaml instead of hardcoding paths in source.""" @property def openrouter_api_key(self) -> str: """Backward-compatible alias for :attr:`api_key` (the OpenRouter key). Older subsystems were written against an ``openrouter_api_key`` attribute before the field was renamed to the provider-neutral ``api_key``; this read-only view keeps them working without a second copy of the secret. Read by the embedding pool (``gemini_embed_pool.py``) and by callers that construct an ``OpenRouterClient`` such as ``agents_main.py``, ``consolidation_main.py``, and ``prompt_context.py``. A trivial getter with no side effects. Returns: str: The configured OpenRouter API key (same value as ``api_key``). """ return self.api_key @openrouter_api_key.setter def openrouter_api_key(self, value: str) -> None: """Set the OpenRouter API key through the backward-compatible alias. Mirror of the :attr:`openrouter_api_key` getter: assigning here simply writes the canonical :attr:`api_key` field, so legacy code that still sets ``cfg.openrouter_api_key = ...`` updates the one real secret rather than shadowing it. A trivial setter with no side effects. Args: value (str): The OpenRouter API key to store on :attr:`api_key`. Returns: None """ self.api_key = value @property def API_KEYS(self) -> dict: """Upper-cased backward-compatible view of the :attr:`api_keys` mapping. Exposes the external-service credential dict (Brave, Vultr, xAI, ElevenLabs, Sporestack, ...) under the historical ``API_KEYS`` name that tool code was written against, e.g. ``ctx.config.API_KEYS["sporestack"]`` in ``tools/sporestack_tools.py``. Returns the live underlying dict rather than a copy, so the same key store is shared. Read by various tools that look up third-party service keys; a trivial getter with no side effects. Returns: dict: The :attr:`api_keys` mapping of service name to credential. """ return self.api_keys @property def configured_platforms(self) -> list[str]: """Return the set of enabled platform names as a lowercase list. Scans :attr:`platforms`, keeping only entries that are ``enabled`` with a non-empty ``type``, lowercases each, and collapses the self-bot variant ``"discord-self"`` to ``"discord"`` so callers see a single canonical Discord platform. The fallback trio ``discord``, ``matrix``, and ``webchat`` is always unioned in for backward compatibility and tests, so the result is a superset of what is strictly configured. Read by ``wallet_manager.py`` and ``tools/user_variables.py`` (via ``hasattr`` guards) to decide which platforms a feature applies to. Pure in-memory computation over the dataclass; no I/O. Returns: list[str]: Unique, lowercase platform identifiers (order not guaranteed since it derives from a set). """ types = set() for p in self.platforms: if p.enabled and p.type: t = p.type.lower() if t == "discord-self": t = "discord" types.add(t) # Ensure fallback defaults are always present for backward compatibility and testing for fallback in ["discord", "matrix", "webchat"]: types.add(fallback) return list(types)
[docs] def resolved_journal_units(self) -> list[str]: """systemd units the admin journal tails, honouring config then the fleet. When :attr:`journal_systemd_units` is set (via YAML) that explicit list wins verbatim. Otherwise the units are derived from the canonical fleet definition — ``core.control_ops.SERVICE_TIERS`` resolved through ``unit_name_for`` — so the default automatically tracks the live microservices (``stargazer-gateway``, ``stargazer-inference``, ``stargazer-agents``, ``stargazer-consolidation``, ``stargazer-web`` with the default prefix) and respects this deployment's ``control_unit_prefix`` / ``control_unit_names`` overrides. The old monolithic ``stargazer`` / ``stargazer-swarm`` units no longer exist post-T3. Read by the admin journal WebSocket in ``web/observability_routes.py`` to pick ``journalctl -u`` targets. Pure in-memory computation; no I/O. The import is deferred to keep ``config`` free of a load-time dependency on ``core.control_ops``. Returns: list[str]: Ordered systemd unit names to tail (never empty). """ if self.journal_systemd_units: return list(self.journal_systemd_units) from core.control_ops import fleet_units return fleet_units(self)
# --- LLM quality filter --- llm_filter_enabled: bool = False # --- RLHF anti-pattern guardrail (inverted safety) --- rlhf_guardrail_enabled: bool = True # --- Ego ablation global killswitch --- ego_ablation_enabled: bool = False # --- Proactive responses --- proactive_enabled: bool = False proactive_default_frequency: float = 0.05 proactive_triage_enabled: bool = True proactive_triage_model: str = "gemini-3.1-flash-lite" # --- Channel heartbeat (background MRU nudge, flash model) --- channel_heartbeat_enabled: bool = True channel_heartbeat_interval_min_s: float = 900.0 channel_heartbeat_interval_max_s: float = 2700.0 channel_heartbeat_tick_s: float = 45.0 channel_heartbeat_max_channels: int = 10 channel_heartbeat_concurrency: int = 3 channel_heartbeat_model: str = "gemini-3-flash" # --- Background scheduler: periodic OpenRouter chat/completion tasks --- background_scheduler_chat_llm_enabled: bool = True background_scheduler_log_rag_ingest_enabled: bool = False legacy_kg_extraction: bool = False """Whether the legacy auto_kg_extraction task is active. Disabled by default since Knowledge Anchoring does the same thing, but in real time.""" # --- NCM: full disable per channel (platform:channel_id); see feature_toggles --- ncm_fully_disabled_channels: frozenset[str] = field( default_factory=lambda: frozenset(DEFAULT_NCM_FULLY_DISABLED_CHANNELS), ) # --- Message batching --- batch_window: float = 5.0 max_batch_size: int = 10 # --- Threadweave --- dna_vault_path: str = "data/dna_vault" # --- API key encryption (per-user keys in SQLite) --- api_key_encryption_db_path: str = "data/api_key_encryption_keys.db" # --- Media cache --- media_cache_dir: str = "media_cache" media_cache_max_mb: int = 500 media_download_retry_attempts: int = 3 """Total attempts (1 = no retry) for a CDN attachment/emoji download before giving up. A transient Discord/Matrix CDN blip otherwise drops the attachment for that one message; bounded retry closes that one-shot loss.""" # --- Visual Memory (cross-channel image pattern recognition) --- # 👀 visual_memory_enabled: bool = True """Master toggle for the visual memory graph. When False, no image processing occurs and no VisualEntity nodes are created.""" visual_memory_face_threshold: float = 0.65 """Cosine similarity threshold for face identity matching. ArcFace embeddings typically score 0.5-0.7 for same-person matches; 0.65 is conservative enough to avoid false positives on CPU models.""" visual_memory_object_threshold: float = 0.75 """Cosine similarity threshold for object/scene matching via SigLIP. Higher than face threshold because object identity is less precise.""" visual_memory_insightface_model: str = "buffalo_sc" """InsightFace model pack. ``buffalo_sc`` is CPU-friendly (small); ``buffalo_l`` is more accurate but heavier. Both use ArcFace 512d.""" visual_memory_siglip_model: str = "google/siglip-so400m-patch14-224" """HuggingFace SigLIP model for general object/scene embeddings. ``so400m-patch14-224`` is the medium variant; CPU-manageable.""" visual_memory_max_entities_per_image: int = 10 """Max number of face/object entities to extract from a single image.""" visual_memory_cache_ttl_seconds: int = 300 """How long (seconds) recognition results stay in Redis for context injection. 5 minutes covers the typical conversation window.""" visual_memory_min_sightings_to_report: int = 2 """Minimum sighting count before Star reports recognizing an entity. Set to 2 so one-off sightings aren't mentioned (avoids noise).""" visual_memory_text_density_threshold: float = 0.15 """Canny edge density ratio above which an image is considered text-heavy (screenshots, code, documents) and skipped for visual entity recognition. Combined with color variance and histogram bimodality checks to avoid false positives on textured photos. Range 0.0-1.0; lower = more aggressive filtering.""" # --- Per-user LLM sandboxes (filesystem + Tor-enforced code execution) --- user_sandboxes_dir: str = "data/user_sandboxes" """Root directory for ``{user_id}/workspace`` sandbox trees.""" user_sandbox_quota_bytes: int = 1024**3 """Max total size (bytes) of regular files per user under ``user_sandboxes_dir/{user_id}/``.""" user_sandbox_quota_mode: str = "auto" """``python`` = app-level byte counting; ``loopback`` = per-user ext4 loop image (needs Linux+root); ``auto`` = try loopback then Python.""" user_sandbox_loopback_dir: str = "data/user_sandbox_loopback" """Sparse disk images for loopback sandboxes (see docs/USER_SANDBOX_TOR.md).""" user_sandbox_loopback_index_path: str = "data/user_sandbox_loopback_index.json" """JSON map of user_id to image and mount paths for remount-after-reboot.""" user_sandbox_remount_on_startup: bool = True """If True, attempt to remount loopback sandboxes when the bot starts (Linux + root).""" tor_gateway_container: str = "stargazer-tor-gateway" """Docker container name for ``docker run --network container:...`` Tor sidecar.""" sandbox_curl_image: str = "curlimages/curl:8.11.1" """Image for HTTPS downloads into a user sandbox over the Tor netns.""" # --- Emoji resolution --- resolve_emojis_as_images: bool = True max_emojis_per_message: int = 5 # --- Web GUI --- web_host: str = "127.0.0.1" web_port: int = 8080 redis_platform_admin_host: str = "127.0.0.1" redis_platform_admin_port: int = 8081 # Explicit systemd units for the admin journal WebSocket (journalctl -u …). # Empty (the default) means "tail every fleet tier", resolved at read time by # :meth:`resolved_journal_units` from ``core.control_ops.SERVICE_TIERS`` so the # default tracks the live microservices (stargazer-gateway / -inference / # -agents / -consolidation / -web) and honours any ``control_unit_prefix`` / # ``control_unit_names`` overrides instead of hardcoding stale unit names. journal_systemd_units: list[str] = field(default_factory=list) # --- Admin user IDs (bypass privilege escalation, access admin UI) --- admin_user_ids: list[str] = field(default_factory=list) # --- Shell tool legacy allowlist (see tools/shell_tool.py); prefer UNSANDBOXED_EXEC --- shell_authorized_user_ids: list[str] = field(default_factory=list) # --- Admin ops: service names and repo path (used by !bot_restart / !proxy_restart / !bot_pull) --- bot_service_name: str = "stargazer-inference" """systemd unit for the back-compat ``!bot_restart`` (now an alias for ``!restart_inference``). The old monolithic ``stargazer`` unit no longer exists post-T3; the inference tier is the closest analogue.""" proxy_service_name: str = "gemini-cli-proxy" """systemd service name for ``!proxy_restart`` (default: ``gemini-cli-proxy``).""" bot_repo_path: str = "" """Absolute path to the bot git repo for ``!bot_pull``. Empty = current working directory.""" # --- Cluster control-ops (fleet-wide !restart_* / !bot_pull over sg:control:ops) --- control_unit_prefix: str = "stargazer-" """Prefix for deriving a service's systemd unit name: ``{prefix}{service_name}`` (e.g. ``stargazer-gateway``). Overridden per-service by ``control_unit_names``.""" control_unit_names: dict[str, str] = field(default_factory=dict) """Optional explicit map of ``service_name -> systemd unit`` for non-standard deployments. Falls back to ``control_unit_prefix + service_name`` when unset.""" control_proxy_handler_service: str = "gateway" """Which service tier issues ``!proxy_restart`` so the external proxy unit is restarted exactly once (not once per service that hears the control op).""" control_gateway_restart_grace: float = 8.0 """Seconds the gateway waits before self-restarting, so ACKs/replies flush and it restarts strictly LAST in a fleet-wide restart.""" control_service_restart_grace: float = 2.0 """Seconds a non-gateway service waits before self-restarting (lets it RPUSH its ack to the reply channel first).""" control_reply_timeout: float = 3.0 """How long the gateway publisher aggregates per-service acks before reporting.""" control_pull_lock_ttl: int = 90 """TTL (seconds) for the per-host/repo ``sg:control:pull`` dedupe lock.""" command_sync_cooldown_seconds: int = 21600 """Min seconds between gateway slash-command tree syncs (Redis NX cooldown). Boot-time auto-sync re-registers commands without tripping Discord's rate-limit penalty box. Default 6h. Manual ``!sync_tree`` always bypasses it.""" # --- Prompt / pre-inference timeouts (seconds; 0 = no cap where supported) --- prompt_context_build_timeout_seconds: float = 900.0 preinference_context_shield_timeout_seconds: float = 600.0 # Soft deadline for parallel pre-inference asyncio.wait (classify, skills, context, …). preinference_gather_timeout_seconds: float = 100.0 batch_preprocess_shield_timeout_seconds: float = 30.0 # How long inference waits for a media-bearing user turn (attachment OR an # image URL in the text) to finish download+encode and reach history before # generating — else the model answers blind and the image surfaces only on # the next turn. Must exceed url_utils.image's HEAD+GET budget (~23s). media_preprocess_shield_timeout_seconds: float = 30.0 redis_stream_maxlen: int = 100_000 # --- Per-channel semantic recall (Redis KNN + zset ±neighbors, XML on user message) --- channel_semantic_recall_enabled: bool = True channel_semantic_recall_days: int = 30 channel_semantic_recall_top_k: int = 2 channel_semantic_recall_oversample_factor: int = 5 channel_semantic_recall_neighbor_before: int = 5 channel_semantic_recall_neighbor_after: int = 5 channel_semantic_recall_max_total_chars: int = 12000 channel_semantic_recall_max_window_chars: int = 4000 channel_semantic_recall_min_similarity: float = 0.65 channel_semantic_recall_timeout_seconds: float = 12.0 # Cross-channel recall: KNN over the speaker's most-recently-used # channels (excluding the current one). Privacy-by-construction: # MRU is derived from messages the user has personally sent. channel_semantic_recall_cross_channel_enabled: bool = True channel_semantic_recall_cross_channel_top_k_channels: int = 10 channel_semantic_recall_cross_channel_top_k_hits: int = 2 channel_semantic_recall_cross_channel_min_similarity: float = 0.78 channel_semantic_recall_cross_channel_max_total_chars: int = 6000 channel_semantic_recall_cross_channel_max_window_chars: int = 2000 channel_semantic_recall_cross_channel_neighbor_before: int = 3 channel_semantic_recall_cross_channel_neighbor_after: int = 3 channel_semantic_recall_cross_channel_lookback_messages: int = 300 # --- Webhook --- webhook_secret: str = "" # --- Admin panel canonical URL --- admin_panel_base_url: str = "" session_cookie_domain: str = "" # --- Discord OAuth2 (web UI authentication) --- discord_oauth_client_id: str = "" discord_oauth_client_secret: str = "" discord_oauth_redirect_uri: str = "" # --- OAuth2 token management (per-user service connections) --- oauth_encryption_key: str = "" oauth_base_url: str = "" oauth_providers: dict[str, dict[str, Any]] = field(default_factory=dict) # --- Per-platform configs --- platforms: list[PlatformConfig] = field(default_factory=list) # --- Dynamic Logging configuration --- log_level: str = "INFO" structured_logging: bool = False # --- Agent Skills (vector catalog + activate_skill) --- skills_enabled: bool = False skills_corpus_roots: list[str] = field(default_factory=list) skills_index_db: str = "data/skills_index.db" skills_top_k: int = 12 skills_similarity_threshold: float = 0.12 skills_catalog_max_chars: int = 4000 # --- mcpo: MCP-to-OpenAPI proxy (Docker Compose service + tools/mcpo_proxy_tools.py) --- mcpo_enabled: bool = False mcpo_base_url: str = "" mcpo_api_key: str = "" mcpo_config_path: str = "data/mcpo_servers.json" # --- Dangerous-command embedding hint (RediSearch KNN on user message) --- dangerous_command_warning_enabled: bool = True # On Redis / index / KNN errors: "open" = fail-open (no suffix), "warn"/"closed" = inject warning suffix. dangerous_command_guard_fail_mode: str = "open" dangerous_command_similarity_threshold: float = 0.8 # Min gap (danger_sim - benign_sim) when idx:benign_tech has docs; 0 => require d > b only. dangerous_command_benign_margin: float = 0.0 # --- Tool vector classifier tuning --- tool_similarity_threshold: float = 0.30 tool_top_k: int = 15 tool_strategy_force_threshold: float = 0.80 tool_strategy_optional_threshold: float = 0.30 tool_group_expansion_threshold: float = 0.55 # Stricter floor for ``browser_*`` tools (rarely used; noisy retrieval). tool_browser_similarity_threshold: float = 0.60 # --- Starwiki (OAuth-gated markdown wikis + bot tools + per-wiki RAG) --- starwiki_enabled: bool = False starwiki_root: str = "data/starwiki" starwiki_worker_model: str = "gemini-3-flash" starwiki_lint_interval_minutes: int = 60 starwiki_git_author: str = "starwiki-bot" starwiki_git_author_email: str = "starwiki@local" starwiki_max_source_mb: int = 25 starwiki_ingest_concurrency: int = 2 starwiki_allow_public_wiki_edit: bool = True starwiki_rag_auto_index: bool = True starwiki_scheduled_lint_includes_public: bool = True # --- Attachment guard (unreadable text inlined from user attachments) --- attachment_guard_unreadable_truncation_enabled: bool = True attachment_guard_unreadable_truncation_max_chars: int = 2000 attachment_guard_unreadable_truncation_ascii_threshold: float = 0.05 attachment_guard_unreadable_truncation_lang_confidence: float = 0.5 attachment_guard_unreadable_truncation_sample_chars: int = 4096 # Min share of Unicode letters (L*); below this, treat as emoji/symbol spam. attachment_guard_unreadable_truncation_min_letter_ratio: float = 0.05 # Near-max Shannon entropy catches encrypted/base64/hex/random ASCII payloads. attachment_guard_unreadable_truncation_entropy_normalized_threshold: float = 0.98 attachment_guard_unreadable_truncation_entropy_min_bits_per_char: float = 3.9 attachment_guard_unreadable_truncation_entropy_min_chars: int = 1024 # ---- Legacy top-level Matrix fields (kept for backward compat) ---- homeserver: str = "https://matrix.org" user_id: str = "" password: str = "" store_path: str = "nio_store" credentials_file: str = "credentials.json" @classmethod def _parse_kg_config(cls, data: dict) -> dict: """Flatten the ``knowledge_graph`` YAML block into ``kg_*`` field kwargs. Pulls the nested ``knowledge_graph`` mapping out of the parsed YAML and coerces each option (hop limits, seed top-k and similarity thresholds, dynamic-threshold relaxation, per-category candidate caps, edge weights, decay factors, extraction flags, and the channel hints / full-memory user lists) into the flat, typed ``kg_*`` keyword arguments the :class:`Config` constructor expects. Missing keys fall back to the class-level defaults on *cls*, and non-dict sub-values are defensively normalised to empty dicts/lists. A pure data-shaping helper invoked once by :meth:`Config.load` (spread as ``**cls._parse_kg_config(data)`` into the :class:`Config` constructor); it reads only the in-memory *data* dict and touches no Redis/KG/LLM/HTTP collaborator. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``kg_*`` field name, ready to splat into the :class:`Config` constructor. """ kg = data.get("knowledge_graph", {}) if not isinstance(kg, dict): kg = {} hints_raw = kg.get("extraction_channel_hints", {}) if not isinstance(hints_raw, dict): hints_raw = {} hints: dict[str, str] = {} for k, v in hints_raw.items(): if k is None or v is None: continue hints[str(k)] = str(v) full_user_ids_raw = kg.get( "full_user_memory_ids", list(DEFAULT_KG_FULL_USER_MEMORY_IDS), ) if isinstance(full_user_ids_raw, list): full_user_ids = [ str(uid).strip() for uid in full_user_ids_raw if str(uid).strip() ] else: full_user_ids = list(DEFAULT_KG_FULL_USER_MEMORY_IDS) return { "kg_extraction_model": kg.get("extraction_model", cls.kg_extraction_model), "kg_max_hops": int(kg.get("max_hops", cls.kg_max_hops)), "kg_seed_top_k": int(kg.get("seed_top_k", cls.kg_seed_top_k)), "kg_seed_limit": int(kg.get("seed_limit", cls.kg_seed_limit)), "kg_seed_similarity_threshold": float( kg.get("seed_similarity_threshold", cls.kg_seed_similarity_threshold), ), "kg_seed_dynamic_threshold_enabled": bool( kg.get( "seed_dynamic_threshold_enabled", cls.kg_seed_dynamic_threshold_enabled, ), ), "kg_seed_dynamic_threshold_target_ratio": float( kg.get( "seed_dynamic_threshold_target_ratio", cls.kg_seed_dynamic_threshold_target_ratio, ), ), "kg_seed_dynamic_threshold_min": float( kg.get( "seed_dynamic_threshold_min", cls.kg_seed_dynamic_threshold_min, ), ), "kg_seed_dynamic_threshold_min_stored": int( kg.get( "seed_dynamic_threshold_min_stored", cls.kg_seed_dynamic_threshold_min_stored, ), ), "kg_full_user_memory_ids": full_user_ids, "kg_user_seed_min": int( kg.get("user_seed_min", cls.kg_user_seed_min), ), "kg_user_candidate_limit": int( kg.get("user_candidate_limit", cls.kg_user_candidate_limit), ), "kg_lore_candidate_limit": int( kg.get("lore_candidate_limit", cls.kg_lore_candidate_limit), ), "kg_lore_seed_min": int( kg.get("lore_seed_min", cls.kg_lore_seed_min), ), "kg_meta_candidate_limit": int( kg.get("meta_candidate_limit", cls.kg_meta_candidate_limit), ), "kg_meta_seed_min": int( kg.get("meta_seed_min", cls.kg_meta_seed_min), ), "kg_recent_speaker_limit": int( kg.get("recent_speaker_limit", cls.kg_recent_speaker_limit), ), "kg_min_edge_weight": float( kg.get("min_edge_weight", cls.kg_min_edge_weight) ), "kg_default_edge_weight": float( kg.get("default_edge_weight", cls.kg_default_edge_weight), ), "kg_retrieval_hop_decay": float( kg.get("retrieval_hop_decay", cls.kg_retrieval_hop_decay), ), "kg_expansion_neighbor_limit": int( kg.get("expansion_neighbor_limit", cls.kg_expansion_neighbor_limit), ), "kg_max_context_entities": int( kg.get("max_context_entities", cls.kg_max_context_entities) ), "kg_entity_dedup_threshold": float( kg.get("entity_dedup_threshold", cls.kg_entity_dedup_threshold) ), "kg_relationship_decay_factor": float( kg.get("relationship_decay_factor", cls.kg_relationship_decay_factor) ), "kg_per_message_extraction": bool( kg.get("per_message_extraction", cls.kg_per_message_extraction) ), "kg_min_message_length": int( kg.get("min_message_length_for_extraction", cls.kg_min_message_length) ), "kg_per_user_extraction_limit": int( kg.get("per_user_extraction_limit", cls.kg_per_user_extraction_limit) ), "kg_extraction_channel_hints": hints, } @classmethod def _parse_mementropic_config(cls, data: dict) -> dict: """Flatten the ``mementropic`` YAML block into ``mementropic_*`` kwargs. Reads the nested ``mementropic`` mapping (the NCM ledger plus late-fusion and reconsolidation tuning) from the parsed YAML and coerces each option — late-fusion toggle, the semantic/resonance blend weights, ledger size cap, and the reconsolidation learning-rate / step / entity bounds — into the flat, typed ``mementropic_*`` keyword arguments the :class:`Config` constructor expects, falling back to *cls* defaults for absent keys. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_mementropic_config(data)``); it reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``mementropic_*`` field name for the :class:`Config` constructor. """ m = data.get("mementropic", {}) if not isinstance(m, dict): m = {} return { "mementropic_late_fusion_enabled": bool( m.get("late_fusion_enabled", cls.mementropic_late_fusion_enabled), ), "mementropic_semantic_weight": float( m.get("semantic_weight", cls.mementropic_semantic_weight), ), "mementropic_resonance_weight": float( m.get("resonance_weight", cls.mementropic_resonance_weight), ), "mementropic_ledger_max_entries": int( m.get("ledger_max_entries", cls.mementropic_ledger_max_entries), ), "mementropic_reconsolidation_enabled": bool( m.get( "reconsolidation_enabled", cls.mementropic_reconsolidation_enabled ), ), "mementropic_reconsolidation_learning_rate": float( m.get( "reconsolidation_learning_rate", cls.mementropic_reconsolidation_learning_rate, ), ), "mementropic_reconsolidation_max_step": float( m.get( "reconsolidation_max_step", cls.mementropic_reconsolidation_max_step, ), ), "mementropic_reconsolidation_max_entities": int( m.get( "reconsolidation_max_entities", cls.mementropic_reconsolidation_max_entities, ), ), } @classmethod def _parse_persona_pref_config(cls, data: dict) -> dict: """Flatten the ``persona_preferences`` YAML block into ``persona_*`` kwargs. Reads the nested ``persona_preferences`` mapping (the persona preference memory subsystem) from the parsed YAML and coerces each option — the master and extraction toggles, extraction rate limit and minimum response length, injection count/character budgets, and the base persona id — into the flat, typed ``persona_*`` keyword arguments the :class:`Config` constructor expects, defaulting to *cls* values for missing keys. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_persona_pref_config(data)``); it reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``persona_*`` field name for the :class:`Config` constructor. """ pp = data.get("persona_preferences", {}) if not isinstance(pp, dict): pp = {} return { "persona_preferences_enabled": bool( pp.get("enabled", cls.persona_preferences_enabled), ), "persona_pref_extraction_enabled": bool( pp.get("extraction_enabled", cls.persona_pref_extraction_enabled), ), "persona_pref_extraction_rate_limit_seconds": int( pp.get( "extraction_rate_limit_seconds", cls.persona_pref_extraction_rate_limit_seconds, ), ), "persona_pref_extraction_min_response_length": int( pp.get( "extraction_min_response_length", cls.persona_pref_extraction_min_response_length, ), ), "persona_pref_injection_max_count": int( pp.get("injection_max_count", cls.persona_pref_injection_max_count), ), "persona_pref_injection_max_chars": int( pp.get("injection_max_chars", cls.persona_pref_injection_max_chars), ), "persona_pref_base_persona_id": str( pp.get("base_persona_id", cls.persona_pref_base_persona_id), ), } @classmethod def _parse_starwiki_config(cls, data: dict) -> dict: """Flatten the ``starwiki`` YAML block into ``starwiki_*`` field kwargs. Reads the nested ``starwiki`` mapping (the OAuth-gated markdown wiki subsystem) from the parsed YAML and coerces each option — the enable toggle, on-disk root, lint worker model and interval, git author identity, source size cap, ingest concurrency, public-edit and RAG auto-index flags, and whether scheduled lints include public wikis — into the flat, typed ``starwiki_*`` keyword arguments the :class:`Config` constructor expects, defaulting to *cls* values for absent keys. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_starwiki_config(data)``); it reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``starwiki_*`` field name for the :class:`Config` constructor. """ sw = data.get("starwiki", {}) if not isinstance(sw, dict): sw = {} return { "starwiki_enabled": bool(sw.get("enabled", cls.starwiki_enabled)), "starwiki_root": str(sw.get("root", cls.starwiki_root)), "starwiki_worker_model": str( sw.get("worker_model", cls.starwiki_worker_model), ), "starwiki_lint_interval_minutes": int( sw.get("lint_interval_minutes", cls.starwiki_lint_interval_minutes), ), "starwiki_git_author": str( sw.get("git_author", cls.starwiki_git_author), ), "starwiki_git_author_email": str( sw.get("git_author_email", cls.starwiki_git_author_email), ), "starwiki_max_source_mb": int( sw.get("max_source_mb", cls.starwiki_max_source_mb), ), "starwiki_ingest_concurrency": int( sw.get("ingest_concurrency", cls.starwiki_ingest_concurrency), ), "starwiki_allow_public_wiki_edit": bool( sw.get("allow_public_wiki_edit", cls.starwiki_allow_public_wiki_edit), ), "starwiki_rag_auto_index": bool( sw.get("rag_auto_index", cls.starwiki_rag_auto_index), ), "starwiki_scheduled_lint_includes_public": bool( sw.get( "scheduled_lint_includes_public", cls.starwiki_scheduled_lint_includes_public, ), ), } @classmethod def _parse_vector_store_config(cls, data: dict) -> dict: """Flatten the ``vector_store`` YAML block into ``vector_pg_*`` kwargs. Reads the nested ``vector_store`` mapping (the Postgres + pgvector backend for all RAG / memory vector storage) from the parsed YAML and coerces each option — the full DSN override plus the discrete host, port, database, user, password, sslmode, and connection-pool min/max sizes — into the flat, typed ``vector_pg_*`` keyword arguments the :class:`Config` constructor expects, defaulting to *cls* values for missing keys. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_vector_store_config(data)``); it reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``vector_pg_*`` field name for the :class:`Config` constructor. """ vs = data.get("vector_store", {}) if not isinstance(vs, dict): vs = {} return { "vector_pg_dsn": str(vs.get("dsn", cls.vector_pg_dsn)), "vector_pg_host": str(vs.get("host", cls.vector_pg_host)), "vector_pg_port": int(vs.get("port", cls.vector_pg_port)), "vector_pg_database": str(vs.get("database", cls.vector_pg_database)), "vector_pg_user": str(vs.get("user", cls.vector_pg_user)), "vector_pg_password": str(vs.get("password", cls.vector_pg_password)), "vector_pg_sslmode": str(vs.get("sslmode", cls.vector_pg_sslmode)), "vector_pg_min_size": int(vs.get("min_size", cls.vector_pg_min_size)), "vector_pg_max_size": int(vs.get("max_size", cls.vector_pg_max_size)), } @classmethod def _parse_attachment_guard_config(cls, data: dict) -> dict: """Flatten ``attachment_guard.unreadable_truncation`` YAML into kwargs. Drills into the nested ``attachment_guard`` then ``unreadable_truncation`` mappings (the guard that truncates unreadable text inlined from user attachments) and coerces each option — the enable toggle, max/sample character caps, ASCII and language-confidence thresholds, minimum letter ratio, and the Shannon-entropy gating parameters that catch encrypted/base64/random payloads — into the flat, typed ``attachment_guard_unreadable_truncation_*`` keyword arguments the :class:`Config` constructor expects, defaulting to *cls* values for absent keys. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_attachment_guard_config(data)``); it reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``attachment_guard_unreadable_truncation_*`` field name for the :class:`Config` constructor. """ ag = data.get("attachment_guard", {}) if not isinstance(ag, dict): ag = {} ut = ag.get("unreadable_truncation", {}) if not isinstance(ut, dict): ut = {} return { "attachment_guard_unreadable_truncation_enabled": bool( ut.get("enabled", cls.attachment_guard_unreadable_truncation_enabled), ), "attachment_guard_unreadable_truncation_max_chars": int( ut.get( "max_chars", cls.attachment_guard_unreadable_truncation_max_chars ), ), "attachment_guard_unreadable_truncation_ascii_threshold": float( ut.get( "ascii_threshold", cls.attachment_guard_unreadable_truncation_ascii_threshold, ), ), "attachment_guard_unreadable_truncation_lang_confidence": float( ut.get( "lang_confidence", cls.attachment_guard_unreadable_truncation_lang_confidence, ), ), "attachment_guard_unreadable_truncation_sample_chars": int( ut.get( "sample_chars", cls.attachment_guard_unreadable_truncation_sample_chars, ), ), "attachment_guard_unreadable_truncation_min_letter_ratio": float( ut.get( "min_letter_ratio", cls.attachment_guard_unreadable_truncation_min_letter_ratio, ), ), "attachment_guard_unreadable_truncation_entropy_normalized_threshold": float( ut.get( "entropy_normalized_threshold", cls.attachment_guard_unreadable_truncation_entropy_normalized_threshold, ), ), "attachment_guard_unreadable_truncation_entropy_min_bits_per_char": float( ut.get( "entropy_min_bits_per_char", cls.attachment_guard_unreadable_truncation_entropy_min_bits_per_char, ), ), "attachment_guard_unreadable_truncation_entropy_min_chars": int( ut.get( "entropy_min_chars", cls.attachment_guard_unreadable_truncation_entropy_min_chars, ), ), } @classmethod def _parse_knowledge_anchoring_config(cls, data: dict) -> dict: """Flatten the ``knowledge_anchoring`` YAML block into ``ka_*`` kwargs. Parses the multi-level ``knowledge_anchoring`` mapping for the Limbic Anchoring subsystem — its ``worker``, ``gates`` (including the catch-up relaxation set), ``sunset``, ``gc``, and ``dedup`` sub-sections — and coerces every option into the flat, typed ``ka_*`` keyword arguments the :class:`Config` constructor expects, defaulting to *cls* values for absent keys. The significant-tools whitelist falls back to a production-safe default list when the YAML value is missing or not a list, and the sunset window bounds are validated so ``window_end_days`` is strictly less than ``window_start_days``. A pure data-shaping helper called once by :meth:`Config.load` (spread as ``**cls._parse_knowledge_anchoring_config(data)``) and exercised directly by ``tests/test_extraction_llm_timeout.py`` and ``tests/test_catchup_gate.py``. It reads only the in-memory *data* dict and performs no I/O. Args: data (dict): The full parsed ``config.yaml`` mapping. Returns: dict: Keyword arguments keyed by ``ka_*`` field name for the :class:`Config` constructor. Raises: ValueError: If ``sunset.window_end_days`` is greater than or equal to ``sunset.window_start_days`` (a degenerate sunset window). """ ka = data.get("knowledge_anchoring", {}) if not isinstance(ka, dict): ka = {} w = ka.get("worker", {}) if not isinstance(w, dict): w = {} g = ka.get("gates", {}) if not isinstance(g, dict): g = {} s = ka.get("sunset", {}) if not isinstance(s, dict): s = {} gc = ka.get("gc", {}) if not isinstance(gc, dict): gc = {} dedup = ka.get("dedup", {}) if not isinstance(dedup, dict): dedup = {} org = ka.get("org", {}) if not isinstance(org, dict): org = {} # Significant tools: YAML list or fallback to production-safe default. _default_tools = [ "search_web", "read_file", "kg_search_entities", "unsandboxed_python_tool", ] raw_tools = g.get("significant_tools", None) if isinstance(raw_tools, list): significant_tools = [str(t) for t in raw_tools] else: significant_tools = _default_tools # Guard: window_end_days must be strictly less than window_start_days. _start = int(s.get("window_start_days", cls.ka_sunset_window_start_days)) _end = int(s.get("window_end_days", cls.ka_sunset_window_end_days)) if _end >= _start: raise ValueError( f"knowledge_anchoring.sunset.window_end_days ({_end}) must be " f"less than window_start_days ({_start})" ) return { "ka_enabled": bool(ka.get("enabled", cls.ka_enabled)), # Worker "ka_batch_size": int(w.get("batch_size", cls.ka_batch_size)), "ka_sleep_interval_seconds": int( w.get("sleep_interval_seconds", cls.ka_sleep_interval_seconds) ), "ka_max_anchors_per_channel_per_day": int( w.get( "max_anchors_per_channel_per_day", cls.ka_max_anchors_per_channel_per_day, ) ), "ka_dlq_retry_limit": int(w.get("dlq_retry_limit", cls.ka_dlq_retry_limit)), "ka_backpressure_threshold": int( w.get("backpressure_threshold", cls.ka_backpressure_threshold) ), "ka_backpressure_recovery": int( w.get("backpressure_recovery", cls.ka_backpressure_recovery) ), "ka_batch_min_size": int(w.get("batch_min_size", cls.ka_batch_min_size)), "ka_epoch_size": int(w.get("epoch_size", cls.ka_epoch_size)), "ka_fast_path_prefix": str( w.get("fast_path_prefix", cls.ka_fast_path_prefix) ), # Gates "ka_length_minimum": int(g.get("length_minimum", cls.ka_length_minimum)), "ka_similarity_familiar": float( g.get("similarity_familiar", cls.ka_similarity_familiar) ), "ka_similarity_anomaly": float( g.get("similarity_anomaly", cls.ka_similarity_anomaly) ), "ka_density_threshold": float( g.get("density_threshold", cls.ka_density_threshold) ), "ka_tool_rate_limit_per_user_per_hour": int( g.get( "tool_rate_limit_per_user_per_hour", cls.ka_tool_rate_limit_per_user_per_hour, ) ), "ka_significant_tools": significant_tools, # Catch-up Gate "ka_catchup_enabled": bool( g.get("catchup_enabled", cls.ka_catchup_enabled) ), "ka_catchup_similarity_familiar": float( g.get("catchup_similarity_familiar", cls.ka_catchup_similarity_familiar) ), "ka_catchup_similarity_anomaly": float( g.get("catchup_similarity_anomaly", cls.ka_catchup_similarity_anomaly) ), "ka_catchup_density_threshold": float( g.get("catchup_density_threshold", cls.ka_catchup_density_threshold) ), "ka_catchup_max_anchors_per_day": int( g.get("catchup_max_anchors_per_day", cls.ka_catchup_max_anchors_per_day) ), # Sunset "ka_sunset_interval_seconds": int( s.get("interval_seconds", cls.ka_sunset_interval_seconds) ), "ka_sunset_window_start_days": _start, "ka_sunset_window_end_days": _end, "ka_sunset_daily_summaries_per_epoch": int( s.get( "daily_summaries_per_epoch", cls.ka_sunset_daily_summaries_per_epoch ) ), "ka_sunset_token_reduction_threshold": int( s.get( "token_reduction_threshold", cls.ka_sunset_token_reduction_threshold ) ), "ka_sunset_hydration_similarity": float( s.get("hydration_similarity", cls.ka_sunset_hydration_similarity) ), # GC "ka_gc_orphan_age_days": int( gc.get("orphan_age_days", cls.ka_gc_orphan_age_days) ), # Deduplication Worker "ka_dedup_enabled": bool(dedup.get("enabled", cls.ka_dedup_enabled)), "ka_dedup_interval_seconds": int( dedup.get("interval_seconds", cls.ka_dedup_interval_seconds) ), "ka_dedup_semantic_threshold": float( dedup.get("semantic_threshold", cls.ka_dedup_semantic_threshold) ), "ka_dedup_structural_batch_size": int( dedup.get("structural_batch_size", cls.ka_dedup_structural_batch_size) ), "ka_dedup_semantic_batch_size": int( dedup.get("semantic_batch_size", cls.ka_dedup_semantic_batch_size) ), "ka_dedup_llm_retry_limit": int( dedup.get("llm_retry_limit", cls.ka_dedup_llm_retry_limit) ), "ka_dedup_neighbourhood_limit": int( dedup.get("neighbourhood_limit", cls.ka_dedup_neighbourhood_limit) ), # Organization Worker "ka_org_enabled": bool(org.get("enabled", cls.ka_org_enabled)), "ka_org_interval_seconds": int( org.get("interval_seconds", cls.ka_org_interval_seconds) ), "ka_org_hub_degree_threshold": int( org.get("hub_degree_threshold", cls.ka_org_hub_degree_threshold) ), "ka_org_max_hubs_per_cycle": int( org.get("max_hubs_per_cycle", cls.ka_org_max_hubs_per_cycle) ), # Worker HTTP "ka_extraction_http_timeout_seconds": float( w.get( "extraction_http_timeout_seconds", cls.ka_extraction_http_timeout_seconds, ) ), }
[docs] @classmethod def load(cls, path: str | Path = "config.yaml") -> "Config": """Build a fully-resolved :class:`Config` from YAML plus env overrides. The primary configuration entry point for the whole bot. It reads *path* (default ``config.yaml``) if present, deep-merges an optional ``data/skills_runtime.yaml`` overlay via :func:`_deep_merge_dict`, fans the nested sections out through the ``_parse_*`` helpers (knowledge graph, mementropic, persona prefs, starwiki, vector store, attachment guard, knowledge anchoring), normalises the platform list / legacy Matrix fields / tool permissions / API-key and admin-id lists, constructs the :class:`Config`, and then applies typed environment-variable overrides from ``env_map`` (coercing to the field's existing bool/int/float/str type). Finally it folds in a ``DISCORD_TOKEN`` and per-provider OAuth credentials from the environment, anchors the Redis mTLS certificate paths and the mcpo config path relative to the config file's directory, and warns when ``webhook_secret`` is empty while bound to a public host. Reads the filesystem (the YAML files and, via ``Path.resolve``, the cert paths) and ``os.environ``; it calls ``user_llm_config.sanitize_llm_http_url`` on the LLM base URL but opens no Redis/KG/LLM/HTTP connection itself. Called at startup by every service and script that needs configuration — ``inference_main.py``, ``agents_main.py``, ``consolidation_main.py``, ``background_tasks.py``, ``response_postprocessor.py``, the KG migration scripts, and others — typically as a bare ``Config.load()``. Args: path (str | Path): Path to the YAML config file. Defaults to ``"config.yaml"``; a missing file is tolerated and yields a defaults-only configuration. Returns: Config: A fully populated, env-overridden configuration instance. Raises: ValueError: Propagated from :meth:`_parse_knowledge_anchoring_config` when the sunset window bounds are degenerate. """ data: dict = {} config_path = Path(path) if config_path.exists(): with open(config_path, "r", encoding="utf-8") as f: data = yaml.safe_load(f) or {} # Optional overlay (e.g. from scripts/skills_corpus_pipeline.py) runtime_path = Path("data/skills_runtime.yaml") if runtime_path.exists(): try: with open(runtime_path, "r", encoding="utf-8") as f: overlay = yaml.safe_load(f) or {} if isinstance(overlay, dict): data = _deep_merge_dict(data, overlay) except OSError: pass # --- Parse platform list (new-style) -------------------------- raw_platforms: list[dict[str, Any]] = data.get("platforms", []) platform_configs: list[PlatformConfig] = [] for raw in raw_platforms: ptype = raw.get("type", "") enabled = raw.get("enabled", True) settings = {k: v for k, v in raw.items() if k not in ("type", "enabled")} platform_configs.append( PlatformConfig( type=ptype, enabled=enabled, settings=settings, ) ) # --- Legacy top-level Matrix fields --------------------------- homeserver = data.get("homeserver", cls.homeserver) user_id_val = data.get("user_id", cls.user_id) password = data.get("password", cls.password) store_path = data.get("store_path", cls.store_path) credentials_file = data.get("credentials_file", cls.credentials_file) # If no platforms list but legacy Matrix fields are present, # synthesise a Matrix platform entry for backward compatibility. if not platform_configs and user_id_val: platform_configs.append( PlatformConfig( type="matrix", enabled=True, settings={ "homeserver": homeserver, "user_id": user_id_val, "password": password, "store_path": store_path, "credentials_file": credentials_file, }, ) ) # --- Tool permissions -------------------------------------- raw_perms: dict[str, list[str]] = data.get("tool_permissions", {}) tool_permissions = ( {k: [str(uid) for uid in v] for k, v in raw_perms.items()} if isinstance(raw_perms, dict) else {} ) # --- API keys ----------------------------------------------- raw_api_keys = data.get("api_keys", {}) api_keys = dict(raw_api_keys) if isinstance(raw_api_keys, dict) else {} # Inject API keys from env vars if not in YAML # 🕷️ for env_name, key_name in ( ("XAI_API_KEY", "xai"), ("ELEVENLABS_API_KEY", "elevenlabs"), ("SUNO_COOKIE", "suno_cookie"), ("WOLFRAM_ALPHA_APP_ID", "wolfram_alpha"), ("POLLINATIONS_API_KEY", "pollinations"), ): if key_name not in api_keys: env_val = os.environ.get(env_name, "") if env_val: api_keys[key_name] = env_val # --- Admin user IDs ---------------------------------------- raw_admin_ids = data.get("admin_user_ids", []) admin_user_ids = ( [str(uid) for uid in raw_admin_ids] if isinstance(raw_admin_ids, list) else [] ) raw_shell_ids = data.get("shell_authorized_user_ids") if isinstance(raw_shell_ids, list): shell_authorized_user_ids = [str(uid) for uid in raw_shell_ids] else: shell_authorized_user_ids = list(_DEFAULT_SHELL_AUTHORIZED_USER_IDS) # --- Admin panel base URL ----------------------------------- admin_panel_base_url = str(data.get("admin_panel_base_url", "")).rstrip("/") session_cookie_domain = str(data.get("session_cookie_domain", "")).strip() # --- Discord OAuth2 ---------------------------------------- discord_oauth = data.get("discord_oauth", {}) if not isinstance(discord_oauth, dict): discord_oauth = {} # Derive redirect_uri from admin_panel_base_url when not explicit. _oauth_redirect = discord_oauth.get("redirect_uri", "") if not _oauth_redirect and admin_panel_base_url: _oauth_redirect = f"{admin_panel_base_url}/auth/callback" # --- OAuth2 token management -------------------------------- oauth_cfg = data.get("oauth", {}) if not isinstance(oauth_cfg, dict): oauth_cfg = {} oauth_providers_raw = oauth_cfg.get("providers", {}) if not isinstance(oauth_providers_raw, dict): oauth_providers_raw = {} # --- Proactive responses ---------------------------------- proactive_cfg = data.get("proactive", {}) if not isinstance(proactive_cfg, dict): proactive_cfg = {} # --- Channel heartbeat ------------------------------------ ch_hb = data.get("channel_heartbeat", {}) if not isinstance(ch_hb, dict): ch_hb = {} # --- Background scheduler (chat LLM periodic tasks) -------- bg_sched = data.get("background_scheduler", {}) if not isinstance(bg_sched, dict): bg_sched = {} # --- mcpo (MCP OpenAPI proxy) ----------------------------- mcpo_cfg = data.get("mcpo", {}) if not isinstance(mcpo_cfg, dict): mcpo_cfg = {} # --- Agent Skills ----------------------------------------- skills_cfg = data.get("skills", {}) if not isinstance(skills_cfg, dict): skills_cfg = {} skills_roots_raw = skills_cfg.get("corpus_roots", []) if isinstance(skills_roots_raw, list): skills_corpus_roots = [str(p) for p in skills_roots_raw] else: skills_corpus_roots = [] # --- Tool classifier tuning ---------------------------------- tc_cfg = data.get("tool_classifier", {}) if not isinstance(tc_cfg, dict): tc_cfg = {} resolved_api_key = ( data.get("api_key") or data.get("openrouter_api_key") or cls.api_key ) # Create a default instance to retrieve factory-initialized default list overrides safely default_cfg = cls() # --- Override lists normalization --- raw_user_override = data.get("overall_user_id_absolute_override_list") if isinstance(raw_user_override, list): overall_user_id_absolute_override_list = [str(x) for x in raw_user_override] else: overall_user_id_absolute_override_list = list(default_cfg.overall_user_id_absolute_override_list) raw_channel_override = data.get("overall_channel_id_absolute_override_list") if isinstance(raw_channel_override, list): overall_channel_id_absolute_override_list = [str(x) for x in raw_channel_override] else: overall_channel_id_absolute_override_list = list(default_cfg.overall_channel_id_absolute_override_list) _ncm_list = data.get("ncm_fully_disabled_channels") if isinstance(_ncm_list, list): ncm_fully_disabled_channels = frozenset(str(x) for x in _ncm_list) else: ncm_fully_disabled_channels = None # use dataclass default if "top_p" in data: _raw_tp = data["top_p"] _top_p: float = cls.top_p if _raw_tp is None else float(_raw_tp) else: _top_p = cls.top_p # --- Redis sentinels normalization --- raw_sentinels = data.get("redis_sentinels") or [] if isinstance(raw_sentinels, str): redis_sentinels = [s.strip() for s in raw_sentinels.split(",") if s.strip()] elif isinstance(raw_sentinels, list): redis_sentinels = [str(s).strip() for s in raw_sentinels if str(s).strip()] else: redis_sentinels = [] cfg = cls( api_key=resolved_api_key, gemini_api_key=data.get("gemini_api_key", cls.gemini_api_key), llm_base_url=sanitize_llm_http_url( str(data.get("llm_base_url", cls.llm_base_url)), ), model=data.get("model", cls.model), temperature=float(data.get("temperature", cls.temperature)), max_tokens=int(data.get("max_tokens", cls.max_tokens)), top_p=_top_p, openrouter_http_connect_timeout_seconds=float( data.get( "openrouter_http_connect_timeout_seconds", cls.openrouter_http_connect_timeout_seconds, ), ), openrouter_http_read_timeout_seconds=float( data.get( "openrouter_http_read_timeout_seconds", cls.openrouter_http_read_timeout_seconds, ), ), openrouter_http_write_timeout_seconds=float( data.get( "openrouter_http_write_timeout_seconds", cls.openrouter_http_write_timeout_seconds, ), ), openrouter_http_pool_timeout_seconds=float( data.get( "openrouter_http_pool_timeout_seconds", cls.openrouter_http_pool_timeout_seconds, ), ), system_prompt_file=data.get("system_prompt_file", cls.system_prompt_file), max_history=int(data.get("max_history", cls.max_history)), tools_dir=data.get("tools_dir", cls.tools_dir), tools_service_mode=str( data.get("tools_service_mode", cls.tools_service_mode) ).strip().lower(), tools_exec_timeout=float( data.get("tools_exec_timeout", cls.tools_exec_timeout) ), tools_local_fallback=bool( data.get("tools_local_fallback", cls.tools_local_fallback) ), tools_force_in_process=[ str(x).strip() for x in (data.get("tools_force_in_process") or []) if str(x).strip() ], tools_require_session_record=bool( data.get("tools_require_session_record", cls.tools_require_session_record) ), tool_permissions=tool_permissions, api_keys=api_keys, prowlarr_base_url=str(data.get("prowlarr_base_url", cls.prowlarr_base_url)), prowlarr_api_key=str(data.get("prowlarr_api_key", cls.prowlarr_api_key)), cursor_api_key=str(data.get("cursor_api_key", cls.cursor_api_key)), redis_url=data.get("redis_url", cls.redis_url), redis_sentinels=redis_sentinels, redis_sentinel_master=str(data.get("redis_sentinel_master", cls.redis_sentinel_master or "falkordb")), redis_tls_cert=data.get("redis_tls_cert", cls.redis_tls_cert), redis_tls_key=data.get("redis_tls_key", cls.redis_tls_key), redis_tls_ca=data.get("redis_tls_ca", cls.redis_tls_ca), redis_tls_verify_peer=bool( data.get("redis_tls_verify_peer", cls.redis_tls_verify_peer) ), redis_max_retries=int( data.get("redis_max_retries", cls.redis_max_retries) ), redis_health_check_interval=float( data.get( "redis_health_check_interval", cls.redis_health_check_interval ) ), redis_socket_keepalive=bool( data.get("redis_socket_keepalive", cls.redis_socket_keepalive) ), redis_socket_connect_timeout=float( data.get( "redis_socket_connect_timeout", cls.redis_socket_connect_timeout, ) ), redis_socket_timeout=float( data.get("redis_socket_timeout", cls.redis_socket_timeout) ), embedding_model=data.get("embedding_model", cls.embedding_model), embedding_batch_size=int( data.get("embedding_batch_size", cls.embedding_batch_size) ), embedding_flush_interval=float( data.get("embedding_flush_interval", cls.embedding_flush_interval) ), **cls._parse_vector_store_config(data), **cls._parse_kg_config(data), **cls._parse_mementropic_config(data), **cls._parse_persona_pref_config(data), **cls._parse_starwiki_config(data), **cls._parse_attachment_guard_config(data), **cls._parse_knowledge_anchoring_config(data), llm_filter_enabled=bool( data.get("llm_filter_enabled", cls.llm_filter_enabled) ), rlhf_guardrail_enabled=bool( data.get("rlhf_guardrail_enabled", cls.rlhf_guardrail_enabled) ), ego_ablation_enabled=bool( data.get("ego_ablation_enabled", cls.ego_ablation_enabled) ), proactive_enabled=bool(proactive_cfg.get("enabled", cls.proactive_enabled)), proactive_default_frequency=float( proactive_cfg.get("default_frequency", cls.proactive_default_frequency) ), proactive_triage_enabled=bool( proactive_cfg.get("triage_enabled", cls.proactive_triage_enabled) ), proactive_triage_model=str( proactive_cfg.get("triage_model", cls.proactive_triage_model) ), channel_heartbeat_enabled=bool( ch_hb.get("enabled", cls.channel_heartbeat_enabled) ), channel_heartbeat_interval_min_s=float( ch_hb.get("interval_min_s", cls.channel_heartbeat_interval_min_s), ), channel_heartbeat_interval_max_s=float( ch_hb.get("interval_max_s", cls.channel_heartbeat_interval_max_s), ), channel_heartbeat_tick_s=float( ch_hb.get("tick_s", cls.channel_heartbeat_tick_s), ), channel_heartbeat_max_channels=int( ch_hb.get("max_channels", cls.channel_heartbeat_max_channels), ), channel_heartbeat_concurrency=int( ch_hb.get("concurrency", cls.channel_heartbeat_concurrency), ), channel_heartbeat_model=str( ch_hb.get("model", cls.channel_heartbeat_model), ), background_scheduler_chat_llm_enabled=bool( bg_sched.get( "chat_llm_enabled", cls.background_scheduler_chat_llm_enabled, ), ), background_scheduler_log_rag_ingest_enabled=bool( bg_sched.get( "log_rag_ingest_enabled", cls.background_scheduler_log_rag_ingest_enabled, ), ), legacy_kg_extraction=bool( data.get("legacy_kg_extraction", cls.legacy_kg_extraction) ), **( {} if ncm_fully_disabled_channels is None else { "ncm_fully_disabled_channels": ncm_fully_disabled_channels, } ), batch_window=float(data.get("batch_window", cls.batch_window)), max_batch_size=int(data.get("max_batch_size", cls.max_batch_size)), dna_vault_path=data.get("dna_vault_path", cls.dna_vault_path), api_key_encryption_db_path=data.get( "api_key_encryption_db_path", cls.api_key_encryption_db_path ), media_cache_dir=data.get("media_cache_dir", cls.media_cache_dir), media_cache_max_mb=int( data.get("media_cache_max_mb", cls.media_cache_max_mb) ), media_download_retry_attempts=int( data.get( "media_download_retry_attempts", cls.media_download_retry_attempts, ) ), user_sandboxes_dir=data.get("user_sandboxes_dir", cls.user_sandboxes_dir), user_sandbox_quota_bytes=int( data.get("user_sandbox_quota_bytes", cls.user_sandbox_quota_bytes), ), user_sandbox_quota_mode=str( data.get("user_sandbox_quota_mode", cls.user_sandbox_quota_mode), ), user_sandbox_loopback_dir=data.get( "user_sandbox_loopback_dir", cls.user_sandbox_loopback_dir ), user_sandbox_loopback_index_path=data.get( "user_sandbox_loopback_index_path", cls.user_sandbox_loopback_index_path, ), user_sandbox_remount_on_startup=bool( data.get( "user_sandbox_remount_on_startup", cls.user_sandbox_remount_on_startup, ), ), tor_gateway_container=data.get( "tor_gateway_container", cls.tor_gateway_container ), sandbox_curl_image=data.get("sandbox_curl_image", cls.sandbox_curl_image), resolve_emojis_as_images=bool( data.get("resolve_emojis_as_images", cls.resolve_emojis_as_images) ), max_emojis_per_message=int( data.get("max_emojis_per_message", cls.max_emojis_per_message) ), web_host=data.get("web_host", cls.web_host), web_port=int(data.get("web_port", cls.web_port)), redis_platform_admin_host=data.get("redis_platform_admin_host", cls.redis_platform_admin_host), redis_platform_admin_port=int(data.get("redis_platform_admin_port", cls.redis_platform_admin_port)), journal_systemd_units=( [ str(x).strip() for x in data.get("journal_systemd_units") or [] if str(x).strip() ] if isinstance(data.get("journal_systemd_units"), list) else [] # empty sentinel → resolved_journal_units() derives from the fleet ), admin_user_ids=admin_user_ids, shell_authorized_user_ids=shell_authorized_user_ids, # Admin-ops / cluster control-ops bot_service_name=data.get("bot_service_name", cls.bot_service_name), proxy_service_name=data.get("proxy_service_name", cls.proxy_service_name), bot_repo_path=data.get("bot_repo_path", cls.bot_repo_path), control_unit_prefix=data.get("control_unit_prefix", cls.control_unit_prefix), control_unit_names=( dict(data.get("control_unit_names")) if isinstance(data.get("control_unit_names"), dict) else {} ), control_proxy_handler_service=data.get( "control_proxy_handler_service", cls.control_proxy_handler_service ), control_gateway_restart_grace=float( data.get("control_gateway_restart_grace", cls.control_gateway_restart_grace) ), control_service_restart_grace=float( data.get("control_service_restart_grace", cls.control_service_restart_grace) ), control_reply_timeout=float( data.get("control_reply_timeout", cls.control_reply_timeout) ), control_pull_lock_ttl=int( data.get("control_pull_lock_ttl", cls.control_pull_lock_ttl) ), command_sync_cooldown_seconds=int( data.get("command_sync_cooldown_seconds", cls.command_sync_cooldown_seconds) ), prompt_context_build_timeout_seconds=float( data.get("prompt_context_build_timeout_seconds", 900.0), ), preinference_context_shield_timeout_seconds=float( data.get("preinference_context_shield_timeout_seconds", 600.0), ), preinference_gather_timeout_seconds=float( data.get("preinference_gather_timeout_seconds", 100.0), ), batch_preprocess_shield_timeout_seconds=float( data.get("batch_preprocess_shield_timeout_seconds", 30.0), ), media_preprocess_shield_timeout_seconds=float( data.get("media_preprocess_shield_timeout_seconds", 30.0), ), redis_stream_maxlen=int(data.get("redis_stream_maxlen", 100_000)), channel_semantic_recall_enabled=bool( data.get( "channel_semantic_recall_enabled", cls.channel_semantic_recall_enabled, ), ), channel_semantic_recall_days=int( data.get( "channel_semantic_recall_days", cls.channel_semantic_recall_days ), ), channel_semantic_recall_top_k=int( data.get( "channel_semantic_recall_top_k", cls.channel_semantic_recall_top_k ), ), channel_semantic_recall_oversample_factor=int( data.get( "channel_semantic_recall_oversample_factor", cls.channel_semantic_recall_oversample_factor, ), ), channel_semantic_recall_neighbor_before=int( data.get( "channel_semantic_recall_neighbor_before", cls.channel_semantic_recall_neighbor_before, ), ), channel_semantic_recall_neighbor_after=int( data.get( "channel_semantic_recall_neighbor_after", cls.channel_semantic_recall_neighbor_after, ), ), channel_semantic_recall_max_total_chars=int( data.get( "channel_semantic_recall_max_total_chars", cls.channel_semantic_recall_max_total_chars, ), ), channel_semantic_recall_max_window_chars=int( data.get( "channel_semantic_recall_max_window_chars", cls.channel_semantic_recall_max_window_chars, ), ), channel_semantic_recall_min_similarity=float( data.get( "channel_semantic_recall_min_similarity", cls.channel_semantic_recall_min_similarity, ), ), channel_semantic_recall_cross_channel_enabled=bool( data.get( "channel_semantic_recall_cross_channel_enabled", cls.channel_semantic_recall_cross_channel_enabled, ), ), channel_semantic_recall_cross_channel_top_k_channels=int( data.get( "channel_semantic_recall_cross_channel_top_k_channels", cls.channel_semantic_recall_cross_channel_top_k_channels, ), ), channel_semantic_recall_cross_channel_top_k_hits=int( data.get( "channel_semantic_recall_cross_channel_top_k_hits", cls.channel_semantic_recall_cross_channel_top_k_hits, ), ), channel_semantic_recall_cross_channel_min_similarity=float( data.get( "channel_semantic_recall_cross_channel_min_similarity", cls.channel_semantic_recall_cross_channel_min_similarity, ), ), channel_semantic_recall_cross_channel_max_total_chars=int( data.get( "channel_semantic_recall_cross_channel_max_total_chars", cls.channel_semantic_recall_cross_channel_max_total_chars, ), ), channel_semantic_recall_cross_channel_max_window_chars=int( data.get( "channel_semantic_recall_cross_channel_max_window_chars", cls.channel_semantic_recall_cross_channel_max_window_chars, ), ), channel_semantic_recall_cross_channel_neighbor_before=int( data.get( "channel_semantic_recall_cross_channel_neighbor_before", cls.channel_semantic_recall_cross_channel_neighbor_before, ), ), channel_semantic_recall_cross_channel_neighbor_after=int( data.get( "channel_semantic_recall_cross_channel_neighbor_after", cls.channel_semantic_recall_cross_channel_neighbor_after, ), ), channel_semantic_recall_cross_channel_lookback_messages=int( data.get( "channel_semantic_recall_cross_channel_lookback_messages", cls.channel_semantic_recall_cross_channel_lookback_messages, ), ), channel_semantic_recall_timeout_seconds=float( data.get( "channel_semantic_recall_timeout_seconds", cls.channel_semantic_recall_timeout_seconds, ), ), webhook_secret=data.get("webhook_secret", cls.webhook_secret), admin_panel_base_url=admin_panel_base_url, session_cookie_domain=session_cookie_domain, discord_oauth_client_id=discord_oauth.get("client_id", ""), discord_oauth_client_secret=discord_oauth.get("client_secret", ""), discord_oauth_redirect_uri=_oauth_redirect, oauth_encryption_key=oauth_cfg.get("encryption_key", ""), oauth_base_url=oauth_cfg.get("base_url", ""), oauth_providers=oauth_providers_raw, platforms=platform_configs, log_level=str(data.get("log_level", cls.log_level)), structured_logging=bool(data.get("structured_logging", cls.structured_logging)), skills_enabled=bool(skills_cfg.get("enabled", False)), skills_corpus_roots=skills_corpus_roots, skills_index_db=str( skills_cfg.get("index_db", cls.skills_index_db), ), skills_top_k=int(skills_cfg.get("top_k", cls.skills_top_k)), skills_similarity_threshold=float( skills_cfg.get( "similarity_threshold", cls.skills_similarity_threshold, ), ), skills_catalog_max_chars=int( skills_cfg.get( "catalog_max_chars", cls.skills_catalog_max_chars, ), ), mcpo_enabled=bool(mcpo_cfg.get("enabled", cls.mcpo_enabled)), mcpo_base_url=str(mcpo_cfg.get("base_url", cls.mcpo_base_url)), mcpo_api_key=str(mcpo_cfg.get("api_key", cls.mcpo_api_key)), mcpo_config_path=str( mcpo_cfg.get("config_path", cls.mcpo_config_path), ), dangerous_command_warning_enabled=bool( data.get( "dangerous_command_warning_enabled", cls.dangerous_command_warning_enabled, ), ), dangerous_command_guard_fail_mode=str( data.get( "dangerous_command_guard_fail_mode", getattr(cls, "dangerous_command_guard_fail_mode", "open"), ), ), dangerous_command_similarity_threshold=float( data.get( "dangerous_command_similarity_threshold", cls.dangerous_command_similarity_threshold, ), ), dangerous_command_benign_margin=float( data.get( "dangerous_command_benign_margin", cls.dangerous_command_benign_margin, ), ), tool_similarity_threshold=float( tc_cfg.get("similarity_threshold", cls.tool_similarity_threshold), ), tool_top_k=int(tc_cfg.get("top_k", cls.tool_top_k)), tool_strategy_force_threshold=float( tc_cfg.get( "strategy_force_threshold", cls.tool_strategy_force_threshold ), ), tool_strategy_optional_threshold=float( tc_cfg.get( "strategy_optional_threshold", cls.tool_strategy_optional_threshold, ), ), tool_group_expansion_threshold=float( tc_cfg.get( "group_expansion_threshold", cls.tool_group_expansion_threshold, ), ), tool_browser_similarity_threshold=float( tc_cfg.get( "browser_similarity_threshold", cls.tool_browser_similarity_threshold, ), ), overall_user_id_absolute_override_list=overall_user_id_absolute_override_list, overall_channel_id_absolute_override_list=overall_channel_id_absolute_override_list, egregores_global_disabled=bool( data.get("egregores_global_disabled", cls.egregores_global_disabled) ), loopfield_global_disabled=bool( data.get("loopfield_global_disabled", cls.loopfield_global_disabled) ), ego_ablation_global_disabled=bool( data.get("ego_ablation_global_disabled", cls.ego_ablation_global_disabled) ), proactive_global_disabled=bool( data.get("proactive_global_disabled", cls.proactive_global_disabled) ), ncm_global_disabled=bool( data.get("ncm_global_disabled", cls.ncm_global_disabled) ), flash_mirror_global_disabled=bool( data.get("flash_mirror_global_disabled", cls.flash_mirror_global_disabled) ), anamnesis_global_disabled=bool( data.get("anamnesis_global_disabled", cls.anamnesis_global_disabled) ), cart_lock_global_enabled=bool( data.get("cart_lock_global_enabled", cls.cart_lock_global_enabled) ), lore_amplifier_global_disabled=bool( data.get("lore_amplifier_global_disabled", cls.lore_amplifier_global_disabled) ), homeserver=homeserver, user_id=user_id_val, password=password, store_path=store_path, credentials_file=credentials_file, ) # --- Environment variable overrides --------------------------- env_map = { "API_KEY": "api_key", "OPENROUTER_API_KEY": "api_key", "GEMINI_API_KEY": "gemini_api_key", "LLM_BASE_URL": "llm_base_url", "OPENROUTER_MODEL": "model", "OPENROUTER_TEMPERATURE": "temperature", "OPENROUTER_MAX_TOKENS": "max_tokens", "OPENROUTER_TOP_P": "top_p", "OPENROUTER_HTTP_READ_TIMEOUT_SECONDS": ( "openrouter_http_read_timeout_seconds" ), "BOT_SYSTEM_PROMPT_FILE": "system_prompt_file", "BOT_MAX_HISTORY": "max_history", "BOT_TOOLS_DIR": "tools_dir", "SG_TOOLS_SERVICE_MODE": "tools_service_mode", "REDIS_URL": "redis_url", "REDIS_SENTINELS": "redis_sentinels", "REDIS_SENTINEL_MASTER": "redis_sentinel_master", "REDIS_TLS_CERT": "redis_tls_cert", "REDIS_TLS_KEY": "redis_tls_key", "REDIS_TLS_CA": "redis_tls_ca", "REDIS_TLS_VERIFY_PEER": "redis_tls_verify_peer", "REDIS_MAX_RETRIES": "redis_max_retries", "REDIS_HEALTH_CHECK_INTERVAL": "redis_health_check_interval", "REDIS_SOCKET_KEEPALIVE": "redis_socket_keepalive", "REDIS_SOCKET_CONNECT_TIMEOUT": "redis_socket_connect_timeout", "REDIS_SOCKET_TIMEOUT": "redis_socket_timeout", "EMBEDDING_MODEL": "embedding_model", "EMBEDDING_BATCH_SIZE": "embedding_batch_size", "EMBEDDING_FLUSH_INTERVAL": "embedding_flush_interval", "STARGAZER_VECTOR_PG_DSN": "vector_pg_dsn", "STARGAZER_VECTOR_PG_HOST": "vector_pg_host", "STARGAZER_VECTOR_PG_PORT": "vector_pg_port", "STARGAZER_VECTOR_PG_DATABASE": "vector_pg_database", "STARGAZER_VECTOR_PG_USER": "vector_pg_user", "STARGAZER_VECTOR_PG_PASSWORD": "vector_pg_password", "STARGAZER_VECTOR_PG_SSLMODE": "vector_pg_sslmode", "STARGAZER_VECTOR_PG_MIN_SIZE": "vector_pg_min_size", "STARGAZER_VECTOR_PG_MAX_SIZE": "vector_pg_max_size", "BOT_MEDIA_CACHE_DIR": "media_cache_dir", "BOT_MEDIA_CACHE_MAX_MB": "media_cache_max_mb", "BOT_MEDIA_DOWNLOAD_RETRY_ATTEMPTS": "media_download_retry_attempts", "STARGAZER_USER_SANDBOXES_DIR": "user_sandboxes_dir", "STARGAZER_USER_SANDBOX_QUOTA_BYTES": "user_sandbox_quota_bytes", "STARGAZER_USER_SANDBOX_QUOTA_MODE": "user_sandbox_quota_mode", "STARGAZER_USER_SANDBOX_LOOPBACK_DIR": "user_sandbox_loopback_dir", "STARGAZER_USER_SANDBOX_LOOPBACK_INDEX_PATH": ( "user_sandbox_loopback_index_path" ), "STARGAZER_USER_SANDBOX_REMOUNT_ON_STARTUP": ( "user_sandbox_remount_on_startup" ), "STARGAZER_TOR_GATEWAY_CONTAINER": "tor_gateway_container", "STARGAZER_SANDBOX_CURL_IMAGE": "sandbox_curl_image", "RESOLVE_EMOJIS_AS_IMAGES": "resolve_emojis_as_images", "MAX_EMOJIS_PER_MESSAGE": "max_emojis_per_message", "BOT_WEB_HOST": "web_host", "BOT_WEB_PORT": "web_port", "REDIS_PLATFORM_ADMIN_HOST": "redis_platform_admin_host", "REDIS_PLATFORM_ADMIN_PORT": "redis_platform_admin_port", # Admin panel "ADMIN_PANEL_BASE_URL": "admin_panel_base_url", "SESSION_COOKIE_DOMAIN": "session_cookie_domain", # Webhook "WEBHOOK_SECRET": "webhook_secret", # Discord OAuth2 "DISCORD_OAUTH_CLIENT_ID": "discord_oauth_client_id", "DISCORD_OAUTH_CLIENT_SECRET": "discord_oauth_client_secret", "DISCORD_OAUTH_REDIRECT_URI": "discord_oauth_redirect_uri", # OAuth2 token management "OAUTH_ENCRYPTION_KEY": "oauth_encryption_key", "OAUTH_BASE_URL": "oauth_base_url", # LLM quality filter "LLM_FILTER_ENABLED": "llm_filter_enabled", # RLHF anti-pattern guardrail "RLHF_GUARDRAIL_ENABLED": "rlhf_guardrail_enabled", # Proactive responses "PROACTIVE_ENABLED": "proactive_enabled", "PROACTIVE_DEFAULT_FREQUENCY": "proactive_default_frequency", "PROACTIVE_TRIAGE_ENABLED": "proactive_triage_enabled", "PROACTIVE_TRIAGE_MODEL": "proactive_triage_model", "STARGAZER_BACKGROUND_SCHEDULER_CHAT_LLM_ENABLED": ( "background_scheduler_chat_llm_enabled" ), "STARGAZER_BACKGROUND_SCHEDULER_LOG_RAG_INGEST_ENABLED": ( "background_scheduler_log_rag_ingest_enabled" ), "STARGAZER_LEGACY_KG_EXTRACTION": "legacy_kg_extraction", # Enhanced Logging configuration env overrides "STARGAZER_LOG_LEVEL": "log_level", "STARGAZER_STRUCTURED_LOGGING": "structured_logging", # Legacy Matrix env vars "MATRIX_HOMESERVER": "homeserver", "MATRIX_USER_ID": "user_id", "MATRIX_PASSWORD": "password", "MATRIX_STORE_PATH": "store_path", "MATRIX_CREDENTIALS_FILE": "credentials_file", "STARGAZER_SKILLS_ENABLED": "skills_enabled", "MCPO_ENABLED": "mcpo_enabled", "MCPO_BASE_URL": "mcpo_base_url", "MCPO_API_KEY": "mcpo_api_key", "MCPO_CONFIG_PATH": "mcpo_config_path", "CURSOR_API_KEY": "cursor_api_key", "DANGEROUS_COMMAND_WARNING_ENABLED": "dangerous_command_warning_enabled", "DANGEROUS_COMMAND_GUARD_FAIL_MODE": "dangerous_command_guard_fail_mode", "DANGEROUS_COMMAND_SIMILARITY_THRESHOLD": ( "dangerous_command_similarity_threshold" ), "DANGEROUS_COMMAND_BENIGN_MARGIN": "dangerous_command_benign_margin", "TOOL_SIMILARITY_THRESHOLD": "tool_similarity_threshold", "TOOL_TOP_K": "tool_top_k", "TOOL_STRATEGY_FORCE_THRESHOLD": "tool_strategy_force_threshold", "TOOL_STRATEGY_OPTIONAL_THRESHOLD": "tool_strategy_optional_threshold", "TOOL_GROUP_EXPANSION_THRESHOLD": "tool_group_expansion_threshold", "TOOL_BROWSER_SIMILARITY_THRESHOLD": "tool_browser_similarity_threshold", "STARGAZER_STARWIKI_ENABLED": "starwiki_enabled", "STARGAZER_STARWIKI_ROOT": "starwiki_root", "STARGAZER_STARWIKI_WORKER_MODEL": "starwiki_worker_model", "STARGAZER_STARWIKI_LINT_INTERVAL_MINUTES": ( "starwiki_lint_interval_minutes" ), } for env_var, attr in env_map.items(): val = os.environ.get(env_var) if val is not None: current = getattr(cfg, attr) if isinstance(current, bool): setattr(cfg, attr, val.lower() not in ("0", "false", "no", "")) elif attr == "top_p": if val.strip(): setattr(cfg, attr, float(val)) else: setattr(cfg, attr, cls.top_p) elif isinstance(current, float): setattr(cfg, attr, float(val)) elif isinstance(current, int): setattr(cfg, attr, int(val)) else: setattr(cfg, attr, val) cfg.llm_base_url = sanitize_llm_http_url(cfg.llm_base_url) # Discord token from env var discord_token = os.environ.get("DISCORD_TOKEN") if discord_token: # Check if a discord platform already exists has_discord = any(p.type == "discord" for p in cfg.platforms) if not has_discord: cfg.platforms.append( PlatformConfig( type="discord", enabled=True, settings={"token": discord_token}, ) ) else: # Update existing discord platform token for p in cfg.platforms: if p.type == "discord": p.settings["token"] = discord_token # OAuth provider credentials from env vars for provider in ("github", "google", "discord", "microsoft"): env_prefix = f"OAUTH_{provider.upper()}" cid = os.environ.get(f"{env_prefix}_CLIENT_ID", "") csecret = os.environ.get(f"{env_prefix}_CLIENT_SECRET", "") if cid or csecret: if provider not in cfg.oauth_providers: cfg.oauth_providers[provider] = {} if cid: cfg.oauth_providers[provider]["client_id"] = cid if csecret: cfg.oauth_providers[provider]["client_secret"] = csecret roots_s = os.environ.get("STARGAZER_SKILLS_CORPUS_ROOTS", "").strip() if roots_s: cfg.skills_corpus_roots = [ p.strip() for p in roots_s.split(",") if p.strip() ] # Resolve Redis mTLS paths relative to the config file so certificate # paths like ``tls/client.crt`` work regardless of process CWD (systemd, # pytest, scripts). _cfg_dir = config_path.resolve().parent def _abs_redis_tls_path(p: str) -> str: """Resolve a Redis mTLS path relative to the config file's directory. Closure defined inside :meth:`Config.load` that anchors relative certificate paths (e.g. ``tls/client.crt``) to ``_cfg_dir`` — the directory containing the loaded ``config.yaml`` — so they resolve correctly no matter what the process working directory is (systemd, pytest, ad-hoc scripts). Empty or whitespace-only values and paths that are already absolute are returned untouched. It is called three times within :meth:`Config.load` to normalize :attr:`Config.redis_tls_cert`, :attr:`Config.redis_tls_key`, and :attr:`Config.redis_tls_ca`. It performs only path manipulation and a ``resolve()`` (which may hit the filesystem to canonicalize) and touches no Redis/KG/LLM/HTTP collaborator. It closes over the enclosing ``_cfg_dir`` local. Args: p (str): A certificate path, possibly empty, relative, or already absolute. Returns: str: *p* unchanged when empty or already absolute; otherwise the absolute, resolved path under ``_cfg_dir``. """ if not p or not str(p).strip(): return p pp = Path(p) if pp.is_absolute(): return str(pp) return str((_cfg_dir / pp).resolve()) cfg.redis_tls_cert = _abs_redis_tls_path(cfg.redis_tls_cert) cfg.redis_tls_key = _abs_redis_tls_path(cfg.redis_tls_key) cfg.redis_tls_ca = _abs_redis_tls_path(cfg.redis_tls_ca) if cfg.mcpo_config_path and not Path(cfg.mcpo_config_path).is_absolute(): cfg.mcpo_config_path = str( (_cfg_dir / cfg.mcpo_config_path).resolve(), ) # --- Webhook hygiene (empty secret => /api/webhook disabled at runtime) --- import ipaddress import logging as _logging _cfg_log = _logging.getLogger(__name__) if not (cfg.webhook_secret or "").strip(): wh = (cfg.web_host or "").strip().lower() public_bind = False if wh in ("0.0.0.0", "::", "[::]"): public_bind = True else: try: ip = ipaddress.ip_address(wh) public_bind = not (ip.is_loopback or ip.is_link_local) except ValueError: public_bind = wh not in ("127.0.0.1", "localhost", "::1") if public_bind: _cfg_log.warning( "webhook_secret is empty — /api/webhook returns 401 until set " "(web_host=%s)", cfg.web_host, ) return cfg
[docs] def redis_ssl_kwargs(self) -> dict: """Return SSL keyword arguments for redis clients. Returns an empty dict when mTLS is not configured, so callers can unconditionally unpack the result into ``Redis()`` / ``from_url()``. Uses client certificate and key from :attr:`redis_tls_cert` / :attr:`redis_tls_key`. Peer (server) certificate verification is enabled strictly. """ if not (self.redis_tls_cert and self.redis_tls_key): return {} return { "ssl_keyfile": self.redis_tls_key, "ssl_certfile": self.redis_tls_cert, "ssl_ca_certs": self.redis_tls_ca, "ssl_cert_reqs": "required" if self.redis_tls_verify_peer else "none", "ssl_check_hostname": self.redis_tls_verify_peer, }
[docs] def redis_resilience_kwargs(self) -> dict[str, Any]: """Connection-resilience kwargs for ``redis.asyncio`` clients. Returns retry/backoff and health-check options so commands survive a Redis Sentinel failover window (master re-election) instead of raising on the first attempt. Safe to unpack into both ``from_url`` and ``Sentinel.master_for``. A **fresh** :class:`~redis.asyncio.retry.Retry` is created on every call so independent connection pools do not share backoff state. Note: ``socket_timeout`` is intentionally only included when :attr:`redis_socket_timeout` > 0. The primary client is shared with blocking stream/pubsub consumers (``XREAD``/``SUBSCRIBE``), and a global read timeout would sever those long-lived blocking calls. """ from redis.asyncio.retry import Retry from redis.backoff import ExponentialBackoff from redis.exceptions import ( ConnectionError as RedisConnectionError, TimeoutError as RedisTimeoutError, ) retries = max(0, int(self.redis_max_retries)) kwargs: dict[str, Any] = { "retry": Retry(ExponentialBackoff(cap=1.0, base=0.05), retries), "retry_on_error": [RedisConnectionError, RedisTimeoutError], "socket_keepalive": bool(self.redis_socket_keepalive), } if self.redis_health_check_interval and self.redis_health_check_interval > 0: kwargs["health_check_interval"] = float(self.redis_health_check_interval) if self.redis_socket_connect_timeout and self.redis_socket_connect_timeout > 0: kwargs["socket_connect_timeout"] = float( self.redis_socket_connect_timeout ) if self.redis_socket_timeout and self.redis_socket_timeout > 0: kwargs["socket_timeout"] = float(self.redis_socket_timeout) return kwargs
[docs] def build_async_redis_client(self, *, decode_responses: bool = True) -> Any: """Build a Sentinel-aware, failover-resilient ``redis.asyncio`` client. Mirrors :class:`MessageCache` wiring so every subsystem (platform adapter, pub/sub, etc.) survives a Sentinel failover identically rather than pinning to a stale master via a bare ``from_url``. Prefers Sentinel (``master_for``) when :attr:`redis_sentinels` is set, otherwise falls back to ``from_url`` against :attr:`redis_url`. """ import redis.asyncio as aioredis from redis.asyncio.sentinel import Sentinel resilience = self.redis_resilience_kwargs() if self.redis_sentinels: _ssl = dict(self.redis_ssl_kwargs()) if _ssl: _ssl["ssl"] = True sentinels: list[tuple[str, int]] = [] for s in self.redis_sentinels: parts = s.split(":") if len(parts) == 2: sentinels.append((parts[0], int(parts[1]))) else: sentinels.append((parts[0], 26379)) sentinel = Sentinel( sentinels, sentinel_kwargs=_ssl, **{**_ssl, **resilience}, ) return sentinel.master_for( self.redis_sentinel_master, decode_responses=decode_responses, **resilience, ) ssl_kw = self.redis_connection_kwargs_for_url(self.redis_url) return aioredis.from_url( self.redis_url, decode_responses=decode_responses, **{**ssl_kw, **resilience}, )
[docs] def redis_connection_kwargs_for_url(self, url: str) -> dict[str, Any]: """SSL options for ``redis.asyncio.from_url`` only when *url* is ``rediss://``. Passing TLS kwargs with a ``redis://`` URL selects a non-SSL :class:`~redis.asyncio.connection.Connection`, which does not accept ``ssl_*`` parameters and raises. """ if not url.startswith("rediss://"): return {} kwargs = self.redis_ssl_kwargs() # Fallback permissive SSL for localhost/127.0.0.1/::1 if rediss is strictly requested from urllib.parse import urlparse try: parsed = urlparse(url) # Normalize hostname (strip brackets for IPv6 loopback [::1]) hostname = (parsed.hostname or "").lower().strip("[] ").strip() if hostname in ("localhost", "127.0.0.1", "::1"): if "ssl_cert_reqs" not in kwargs: kwargs["ssl_cert_reqs"] = "none" if "ssl_check_hostname" not in kwargs: kwargs["ssl_check_hostname"] = False except Exception: pass return kwargs
[docs] def __post_init__(self) -> None: """Sync global ego-ablation state and downgrade local ``rediss://`` URLs. Dataclass post-init hook run automatically after every :class:`Config` construction (including the one inside :meth:`Config.load`). It imports the module-level ``ego_ablation`` and copies :attr:`ego_ablation_enabled` onto its global ``enabled`` flag so the killswitch takes effect process wide, then, when :attr:`redis_url` targets a loopback host (``localhost`` / ``127.0.0.1`` / ``::1``) over ``rediss://`` but no client mTLS cert/key is configured, rewrites the scheme to plain ``redis://`` so local development does not fail a TLS handshake against a non-TLS server. Mutates the global ``ego_ablation.enabled`` and possibly :attr:`redis_url`; parses the URL with ``urllib.parse.urlparse`` and swallows any exception during that rewrite. Invoked implicitly by the dataclass machinery, not called directly. Returns: None """ import ego_ablation ego_ablation.enabled = self.ego_ablation_enabled # Automatically downgrade local rediss:// URLs to redis:// ONLY when mTLS is not configured if self.redis_url and self.redis_url.startswith("rediss://"): from urllib.parse import urlparse try: parsed = urlparse(self.redis_url) # Normalize hostname (strip brackets for IPv6 loopback [::1]) hostname = (parsed.hostname or "").lower().strip("[] ").strip() if hostname in ("localhost", "127.0.0.1", "::1"): if not (self.redis_tls_cert and self.redis_tls_key): self.redis_url = "redis://" + self.redis_url[9:] except Exception: pass