"""Bot configuration loaded from config.yaml with environment variable overrides.
Supports a ``platforms`` list so multiple chat platforms (Matrix, Discord, ...)
can be configured independently alongside the shared LLM / web settings.
"""
from __future__ import annotations
import os
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any
import yaml
from user_llm_config import sanitize_llm_http_url
import base64
import logging
import socket
from datetime import datetime
from typing import Literal, Optional
from pydantic import BaseModel, Field, field_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from observability import observability
logger = logging.getLogger(__name__)
[docs]
class RedisSSLConfig(BaseModel):
enabled: bool = True
host: str = Field(..., min_length=1)
port: int = Field(6379, ge=1, le=65535)
ssl_certfile: str = Field(..., description="Path to client mTLS certificate")
ssl_keyfile: str = Field(..., description="Path to client mTLS private key")
ssl_ca_certs: str = Field(..., description="Path to trusted Redis CA root")
ssl_cert_reqs: Literal["required", "none"] = "required"
[docs]
@field_validator("ssl_certfile", "ssl_keyfile", "ssl_ca_certs")
@classmethod
def validate_paths_exist(cls, v: str) -> str:
"""Validate that a Redis mTLS credential path points at a real file.
Pydantic field validator wired to the ``ssl_certfile``, ``ssl_keyfile``,
and ``ssl_ca_certs`` fields of :class:`RedisSSLConfig`. It wraps the
supplied path in :class:`pathlib.Path` and rejects the value unless that
path resolves to an existing regular file, so a missing client
certificate, private key, or CA root is caught at config-load time
rather than surfacing later as an opaque TLS handshake failure when the
Redis client connects.
Pydantic invokes this automatically during model construction; there are
no direct internal callers. It performs a single filesystem ``stat``
(via :meth:`Path.is_file`) and touches no Redis/KG/LLM/HTTP collaborator.
Args:
v (str): The candidate filesystem path for a cryptographic
credential, as provided in YAML or the environment.
Returns:
str: The same path *v*, unchanged, when it names an existing file.
Raises:
ValueError: If *v* does not point at an existing regular file;
pydantic surfaces this as a validation error.
"""
path = Path(v)
if not path.is_file():
raise ValueError(f"Cryptographic credential path does not exist: {v}")
return v
[docs]
class BotSettings(BaseSettings):
model_config = SettingsConfigDict(
env_file=".env", env_prefix="STAR_", extra="ignore"
)
bot_token: str = Field(..., min_length=10)
admin_user_ids: list[str] = Field(default_factory=list)
dangerous_command_warning_enabled: bool = True
dangerous_command_similarity_threshold: float = Field(0.35, ge=0.0, le=1.0)
redis: RedisSSLConfig
[docs]
class RuntimeState:
wallet_key_active: bool = False
wallet_master_key: Optional[bytes] = None
[docs]
def load_wallet_key_non_halting(
state: RuntimeState, platform_type: str = "eth"
) -> None:
"""Load a wallet master key from the environment without aborting startup.
Reads the per-platform ``STAR_{PLATFORM}_WALLET_MASTER_KEY`` environment
variable, base64-decodes it, validates that it is exactly 32 bytes (256-bit),
and stashes the raw bytes on *state* so wallet features can come online. The
function deliberately never raises: a missing or malformed key is treated as
a degraded-but-bootable condition rather than a fatal error, so the rest of
the bot can still serve non-wallet traffic.
On any failure it logs a loud ASCII-bordered ``CRITICAL`` warning, fires
:func:`observability.alert` (tagging the hostname, ``wallet_key_utils``
component, platform, and UTC timestamp) so the missing-secret condition is
visible in monitoring, and clears both ``wallet_key_active`` and
``wallet_master_key`` on *state*. It reads ``os.environ`` and the host name
but touches no Redis/KG/LLM/HTTP collaborator. Called by the boot path and
exercised directly in ``tests/test_boot_non_halting.py``.
Args:
state (RuntimeState): Mutable runtime container whose
``wallet_master_key`` and ``wallet_key_active`` fields are set in
place to reflect the outcome.
platform_type (str): Chain/platform selector (e.g. ``"eth"``) used to
build the environment variable name; uppercased into
``STAR_{PLATFORM}_WALLET_MASTER_KEY``.
Returns:
None: All results are communicated by mutating *state*.
"""
env_var_name = f"STAR_{platform_type.upper()}_WALLET_MASTER_KEY"
b64_key = os.environ.get(env_var_name)
if not b64_key:
border = "#" * 80
logger.error(
f"\n{border}\n"
f"SECURITY CRITICAL WARNING: Missing environment variable '{env_var_name}'.\n"
"Stargazer v3 will boot in DEGRADED STATE. Wallet features are completely deactivated.\n"
f"{border}"
)
observability.alert(
"CRITICAL",
"Wallet master cryptographic keys are missing. Startup proceeding in degraded mode.",
metadata={
"hostname": socket.gethostname(),
"component": "wallet_key_utils",
"platform": platform_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
},
)
state.wallet_key_active = False
state.wallet_master_key = None
return
try:
raw_key = base64.urlsafe_b64decode(b64_key.encode())
if len(raw_key) != 32:
raise ValueError(
f"Invalid key length: must be exactly 32 bytes (256-bit). Got {len(raw_key)} bytes."
)
state.wallet_master_key = raw_key
state.wallet_key_active = True
logger.info("Successfully loaded wallet master key. Wallet services active.")
except Exception as e:
logger.error(
"Failed to decode cryptographic master key: %s", str(e), exc_info=True
)
observability.alert(
"CRITICAL",
"Wallet master key decoding failure. Wallet services deactivated.",
metadata={
"hostname": socket.gethostname(),
"component": "wallet_key_utils",
"platform": platform_type,
"timestamp": datetime.utcnow().isoformat() + "Z",
"error": str(e),
},
)
state.wallet_key_active = False
state.wallet_master_key = None
# Default channels where the entire NCM stack (inhale/exhale/tools/cadence) is off.
# Aesir-Hall (Discord).
DEFAULT_NCM_FULLY_DISABLED_CHANNELS: frozenset[str] = frozenset(
{
"discord:1361023131107590184",
}
)
DEFAULT_KG_FULL_USER_MEMORY_IDS: tuple[str, ...] = ("281484046150926336",)
# Default shell-tool allowlist when ``shell_authorized_user_ids`` is omitted from YAML.
_DEFAULT_SHELL_AUTHORIZED_USER_IDS: tuple[str, ...] = (
"82303438955753472",
"1063654597937336372",
"517538246788513821",
"389067553940439053",
"829047047633764402",
)
def _deep_merge_dict(base: dict[str, Any], overlay: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge *overlay* on top of *base*, returning a new dict.
Produces a deep union of two plain mapping trees: where both sides hold a
nested dict under the same key the function recurses, and for every other
key the ``overlay`` value wins outright. Neither input is mutated — a fresh
``dict`` is built at each level — so callers can reuse the originals safely.
Used purely for config layering: :meth:`Config.load` calls it to fold the
optional ``data/skills_runtime.yaml`` overlay onto the base ``config.yaml``
data, and it recurses into itself for nested sections. It is also exercised
directly by ``tests/test_config_merge.py``. Pure in-memory computation with
no Redis/KG/LLM/HTTP/filesystem side effects.
Args:
base (dict[str, Any]): The lower-priority mapping (e.g. ``config.yaml``).
overlay (dict[str, Any]): The higher-priority mapping whose values take
precedence on conflicts.
Returns:
dict[str, Any]: A new merged mapping; values from *overlay* override
those in *base* except where both are dicts, which are merged in turn.
"""
out = dict(base)
for key, val in overlay.items():
if key in out and isinstance(out[key], dict) and isinstance(val, dict):
out[key] = _deep_merge_dict(out[key], val)
else:
out[key] = val
return out
[docs]
@dataclass
class Config:
# --- Shared LLM settings ---
"""Central runtime configuration for the whole bot, loaded from YAML and env.
The single source of truth for every tunable in the system — LLM/OpenRouter
settings, Redis/Sentinel and mTLS wiring, the pgvector store, the knowledge
graph, Limbic Anchoring, persona preferences, the web UI, OAuth, per-platform
bindings, and dozens of feature toggles. It is built once via
:meth:`Config.load`, which reads ``config.yaml`` (plus an optional skills
overlay), applies environment-variable overrides, and normalises paths and
lists into the typed fields declared here.
A single loaded instance is shared across all five microservices (gateway,
inference, agents, consolidation, web); each service calls
:meth:`Config.load` at startup and threads the object through to tools via
the ``ToolContext``. Beyond holding data it exposes a few behavioural
helpers — :meth:`redis_ssl_kwargs`, :meth:`redis_resilience_kwargs`, and
:meth:`build_async_redis_client` — that turn these fields into live
``redis.asyncio`` clients, and :meth:`__post_init__` syncs the global
``ego_ablation`` flag. A dataclass; construction itself performs no network
I/O beyond the path canonicalisation done in :meth:`load`.
"""
api_key: str = ""
gemini_api_key: str = ""
llm_base_url: str = "http://localhost:3000/openai"
model: str = "x-ai/grok-4.1-fast"
temperature: float = 0.7
max_tokens: int = 60000
# Nucleus sampling (OpenAI-compatible chat-completions); default 0.99.
top_p: float = 0.99
# httpx.AsyncClient timeouts for OpenRouter chat-completions (see openrouter_client.OpenRouterClient)
openrouter_http_connect_timeout_seconds: float = 10.0
openrouter_http_read_timeout_seconds: float = 1200.0
openrouter_http_write_timeout_seconds: float = 120.0
openrouter_http_pool_timeout_seconds: float = 180.0
system_prompt_file: str = "system_prompt.j2"
max_history: int = 100
tools_dir: str = "tools"
# --- Dedicated tools-execution microservice (stargazer-tools) ---
tools_service_mode: str = "in_process"
"""How the inference tier executes tools.
* ``"in_process"`` (default): run tools locally, exactly as before the split.
Safe baseline — the dedicated tools service can be deployed dormant.
* ``"remote"``: delegate non-pinned tools to the dedicated ``tools`` service
over ``sg:stream:tools``; gateway-pinned tools still go to the gateway and
inference-pinned compound tools (``INFERENCE_PINNED_TOOLS``) still run
locally.
* ``"shadow"``: run remote AND in-process, keep the in-process result, log
divergences (rollout validation only)."""
tools_exec_timeout: float = 120.0
"""Seconds the inference tier waits for a delegated tool result before giving
up. Must exceed the TaskManager background timeout so legitimately slow tools
are not abandoned early."""
tools_local_fallback: bool = True
"""When delegation fails (tools service down / timeout), fall back to running
the tool in-process for that call. Only meaningful while inference still
imports the tool modules (rollout). Set ``False`` for the lean end state, where
a delegation failure returns a recoverable error string to the model instead."""
tools_force_in_process: list[str] = field(default_factory=list)
"""Per-tool kill switch: tool names that must always run in-process on
inference regardless of ``tools_service_mode`` (granular rollback)."""
tools_require_session_record: bool = True
"""When True (default), the tools service rejects any delegated request whose
``trace_id`` has no authenticated session record (written by inference) —
identity is resolved from that record, never from the request envelope. Set
False only as a rollout escape hatch (falls back to envelope identity with a
warning); not recommended once privileged tools run remotely."""
# --- Tool permissions (tool_name -> list of allowed user IDs) ---
tool_permissions: dict[str, list[str]] = field(default_factory=dict)
# --- API keys for external services (Brave Search, Vultr, etc.) ---
api_keys: dict[str, Any] = field(default_factory=dict)
# --- Prowlarr (Docker *arr stack; indexer search API on the host) ---
prowlarr_base_url: str = ""
prowlarr_api_key: str = ""
# --- Cursor IDE (cursor-agent CLI used by send_cursor_prompt,
# import_mcp_tool, and workflow_subagent_tools). Resolution order
# in tools/_cursor_api_key.py: per-user `set_user_api_key
# service=cursor` → channel/global pool → ``CURSOR_API_KEY`` env →
# this field. Never inline the key in tool source — READ_TOOL_CODE
# would expose it.
cursor_api_key: str = ""
# --- Redis message cache ---
redis_url: str = ""
redis_sentinels: list[str] = field(default_factory=list)
redis_sentinel_master: str = "falkordb"
redis_tls_cert: str = ""
redis_tls_key: str = ""
redis_tls_ca: str = ""
redis_tls_verify_peer: bool = True
# --- Redis failover resilience ---
# Number of automatic retries (with exponential backoff) applied to every
# Redis command. This lets reads/writes ride through a Sentinel failover
# window (master re-election) instead of raising on the first attempt.
redis_max_retries: int = 3
# Periodic PING health-check interval (seconds) so stale connections to a
# demoted master are detected and recycled. 0 disables.
redis_health_check_interval: float = 15.0
# Enable TCP keepalive so dead peers are noticed promptly.
redis_socket_keepalive: bool = True
# Connection-establishment timeout (seconds). Does NOT affect blocking reads
# (XREAD/SUBSCRIBE), so it is safe to set on the shared client. 0 disables.
redis_socket_connect_timeout: float = 5.0
# Per-command socket read timeout (seconds). Left at 0 (disabled) by default
# because the MessageCache client is shared with blocking stream/pubsub
# consumers; a global read timeout would break their long-lived XREAD/listen.
redis_socket_timeout: float = 0.0
embedding_model: str = "google/gemini-embedding-001"
# --- Deferred embedding batching ---
embedding_batch_size: int = 50
embedding_flush_interval: float = 3600.0
# --- Vector store (Postgres + pgvector) ---
# Backs all RAG / memory vector storage (see vector_store.py). One DB,
# one schema per store, halfvec(3072) + HNSW (L2). A full DSN, if set,
# overrides the individual host/port/etc fields.
vector_pg_dsn: str = ""
vector_pg_host: str = "10.10.0.7"
vector_pg_port: int = 5432
vector_pg_database: str = "stargazer_filerag"
vector_pg_user: str = "stargazer"
vector_pg_password: str = ""
vector_pg_sslmode: str = "prefer"
vector_pg_min_size: int = 1
vector_pg_max_size: int = 8
# --- Knowledge graph ---
kg_extraction_model: str = "gemini-3-flash"
kg_max_hops: int = 2
# Per-entity-label vector KNN pool before seed thresholding and cap.
kg_seed_top_k: int = 64
kg_seed_limit: int = 15
# FalkorDB cosine scores for this stack are often ~0.35–0.45 for
# plausible matches; stricter values (e.g. 0.65) are supported when
# your index/embeddings produce higher similarities.
kg_seed_similarity_threshold: float = 0.38
# Relax seed similarity floor for category=user when retrieved/stored
# ratio is below target (helps heavy-memory users surface more KG facts).
kg_seed_dynamic_threshold_enabled: bool = True
kg_seed_dynamic_threshold_target_ratio: float = 0.10
kg_seed_dynamic_threshold_min: float = 0.20
kg_seed_dynamic_threshold_min_stored: int = 5
kg_full_user_memory_ids: list[str] = field(
default_factory=lambda: list(DEFAULT_KG_FULL_USER_MEMORY_IDS),
)
kg_user_seed_min: int = 10
kg_user_candidate_limit: int = 100
kg_lore_candidate_limit: int = 40
kg_lore_seed_min: int = 5
kg_meta_candidate_limit: int = 40
kg_meta_seed_min: int = 5
kg_recent_speaker_limit: int = 10
kg_min_edge_weight: float = 0.0
kg_default_edge_weight: float = 0.8
kg_retrieval_hop_decay: float = 0.8
# Max neighbors returned from graph expansion (0 = unlimited).
kg_expansion_neighbor_limit: int = 500
kg_max_context_entities: int = 30
kg_entity_dedup_threshold: float = 0.90
kg_relationship_decay_factor: float = 0.95
kg_per_message_extraction: bool = False
kg_min_message_length: int = 100
kg_per_user_extraction_limit: int = 5
kg_extraction_channel_hints: dict[str, str] = field(default_factory=dict)
"""Optional `platform:channel_id` -> human label for KG extraction prompts."""
# --- Persona preference memory ---
persona_preferences_enabled: bool = True
"""Master toggle for the persona preference memory system."""
persona_pref_extraction_enabled: bool = True
"""Whether auto-extraction of preferences from bot responses is active."""
persona_pref_extraction_rate_limit_seconds: int = 180
"""Minimum seconds between auto-extractions per channel."""
persona_pref_extraction_min_response_length: int = 150
"""Skip extraction for responses shorter than this many characters."""
persona_pref_injection_max_count: int = 8
"""Maximum number of preferences to inject per inference."""
persona_pref_injection_max_chars: int = 2000
"""Character budget for injected preferences per inference."""
persona_pref_base_persona_id: str = "stargazer"
"""Identifier for the base persona (used when no egregore is active)."""
# --- Narrative Subsystem Overrides & Absolute Bypass Lists ---
overall_user_id_absolute_override_list: list[str] = field(
default_factory=lambda: ["517538246788513821", "82303438955753472", "1063654597937336372"]
)
overall_channel_id_absolute_override_list: list[str] = field(
default_factory=lambda: ["discord:1424991476668170351"]
)
egregores_global_disabled: bool = False
loopfield_global_disabled: bool = False
ego_ablation_global_disabled: bool = False
proactive_global_disabled: bool = False
ncm_global_disabled: bool = False
flash_mirror_global_disabled: bool = False
anamnesis_global_disabled: bool = True
cart_lock_global_enabled: bool = False
lore_amplifier_global_disabled: bool = False
# --- Mementropic (NCM ledger + late fusion + embedding drift on recall) ---
mementropic_late_fusion_enabled: bool = True
mementropic_semantic_weight: float = 0.65
mementropic_resonance_weight: float = 0.35
mementropic_ledger_max_entries: int = 50_000
mementropic_reconsolidation_enabled: bool = True
mementropic_reconsolidation_learning_rate: float = 0.03
mementropic_reconsolidation_max_step: float = 0.25
mementropic_reconsolidation_max_entities: int = 24
# --- Limbic Anchoring & Sunset Processor ---
ka_enabled: bool = True
"""Master toggle for the Limbic Anchoring system."""
# Worker
ka_batch_size: int = 20
ka_sleep_interval_seconds: int = 300
ka_max_anchors_per_channel_per_day: int = 300
ka_dlq_retry_limit: int = 3
ka_backpressure_threshold: int = 500
ka_backpressure_recovery: int = 50
ka_batch_min_size: int = 10
"""Minimum backlog depth required before a batch is processed.
Channels with fewer than this many pending messages are skipped
(deferred) until more accumulate, preventing wasteful single-message
LLM calls. Default: 10."""
ka_epoch_size: int = 5000
ka_fast_path_prefix: str = "!remember"
"""Command prefix that triggers the fast-path explicit anchoring route."""
ka_extraction_http_timeout_seconds: float = 360.0
"""Read timeout (seconds) for the extraction LLM HTTP call via the local proxy.
Large batches with rich context can take 150-250 s on Gemini Flash; 360 s
provides a comfortable margin. Raise further if timeouts persist on
backpressure-doubled batches (up to ~40 messages)."""
ka_bootstrap_mode: str = "full"
"""Bootstrap mode for new channels: 'latest' (jump to present) or 'full' (start from beginning)."""
ka_max_backlog_messages: int = 1000
"""Max messages to look back when bootstrapping with 'latest' (e.g. to catch very recent context)."""
# Heuristic Gates
ka_noise_gate_enabled: bool = True
"""Toggle for the heuristic noise filtering system. If False, all messages
within the daily budget are anchored."""
ka_length_minimum: int = 20
ka_similarity_familiar: float = 0.45
ka_similarity_anomaly: float = 0.15
ka_density_threshold: float = 0.4
ka_tool_rate_limit_per_user_per_hour: int = 3
ka_significant_tools: list = field(
default_factory=lambda: [
"search_web",
"read_file",
"kg_search_entities",
"unsandboxed_python_tool",
]
)
"""Tool whitelist for Tool Gate; defaults to a safe production set."""
# Catch-up Gate — relaxed thresholds applied when status=="catch-up"
# (backlog > ka_backpressure_threshold). Wider similarity band and a
# higher daily budget allow high-backlog channels to drain faster.
ka_catchup_enabled: bool = True
"""Toggle catch-up gate relaxation. Set False to use normal thresholds always."""
ka_catchup_similarity_familiar: float = 0.55
"""Familiar-path threshold during catch-up mode (wider than normal 0.45)."""
ka_catchup_similarity_anomaly: float = 0.20
"""Anomaly-path threshold during catch-up mode (lower noise band floor)."""
ka_catchup_density_threshold: float = 0.40
"""Density threshold during catch-up mode."""
ka_catchup_max_anchors_per_day: int = 3000
"""Daily anchor budget per channel during catch-up mode."""
# Sunset Processor
ka_sunset_interval_seconds: int = 43200
ka_sunset_window_start_days: int = 90
ka_sunset_window_end_days: int = 80
ka_sunset_daily_summaries_per_epoch: int = 10
ka_sunset_token_reduction_threshold: int = 60000
ka_sunset_hydration_similarity: float = 0.92
# Garbage Collector
ka_gc_orphan_age_days: int = 95
# Deduplication Worker
ka_dedup_enabled: bool = True
"""Master toggle for the background KG deduplication worker."""
ka_dedup_interval_seconds: int = 300
"""How often (seconds) the dedup cycle runs. Default: every 6 hours."""
ka_dedup_semantic_threshold: float = 0.92
"""Cosine similarity floor for Phase B (semantic) candidate pairs."""
ka_dedup_structural_batch_size: int = 200
"""Max cross-label name collisions resolved per Phase A cycle."""
ka_dedup_semantic_batch_size: int = 50
"""Max LLM synthesis calls issued per Phase B cycle."""
ka_dedup_llm_retry_limit: int = 2
"""Max LLM retries per candidate pair before the pair is skipped."""
ka_dedup_neighbourhood_limit: int = 10
"""Max relationship samples fetched per entity for the synthesis prompt."""
# Organization Worker
ka_org_enabled: bool = True
"""Master toggle for the background KG organization worker."""
ka_org_interval_seconds: int = 600
"""How often (seconds) the organization cycle runs. Default: 10 minutes."""
ka_org_hub_degree_threshold: int = 2
"""Relationship degree above which a node is considered a hub. Default: 2."""
ka_org_max_hubs_per_cycle: int = 5
"""Max number of hubs evaluated per wake cycle."""
ka_org_max_llm_calls_per_cycle: int = 50
"""Max LLM calls allowed in a single organization cycle to avoid rate limits."""
ka_org_llm_delay_seconds: float = 0.5
"""Sleep delay between LLM calls in the organization loop to avoid rate limits."""
# Accumulation Merge
ka_accum_merge_enabled: bool = True
"""Enable description accumulation and LLM consolidation for same-name+label entity hits."""
ka_accum_merge_threshold: int = 10
"""Number of accumulated description segments before LLM consolidation fires. Default: 10."""
ka_accum_merge_llm_retry_limit: int = 2
"""Max LLM retries for consolidation before falling back to keeping the accumulated text."""
ka_accum_merge_reembed_interval: int = 3
"""Re-embed the entity's accumulated description every N appends (0 = only on consolidation)."""
# --- Filesystem / Prompt Context ---
dir_tree_extra_roots: list[str] = field(default_factory=list)
"""Additional absolute filesystem paths to include in the directory tree injected
into the system prompt. Set in config.yaml instead of hardcoding paths in source."""
@property
def openrouter_api_key(self) -> str:
"""Backward-compatible alias for :attr:`api_key` (the OpenRouter key).
Older subsystems were written against an ``openrouter_api_key`` attribute
before the field was renamed to the provider-neutral ``api_key``; this
read-only view keeps them working without a second copy of the secret.
Read by the embedding pool (``gemini_embed_pool.py``) and by callers that
construct an ``OpenRouterClient`` such as ``agents_main.py``,
``consolidation_main.py``, and ``prompt_context.py``. A trivial getter
with no side effects.
Returns:
str: The configured OpenRouter API key (same value as ``api_key``).
"""
return self.api_key
@openrouter_api_key.setter
def openrouter_api_key(self, value: str) -> None:
"""Set the OpenRouter API key through the backward-compatible alias.
Mirror of the :attr:`openrouter_api_key` getter: assigning here simply
writes the canonical :attr:`api_key` field, so legacy code that still
sets ``cfg.openrouter_api_key = ...`` updates the one real secret rather
than shadowing it. A trivial setter with no side effects.
Args:
value (str): The OpenRouter API key to store on :attr:`api_key`.
Returns:
None
"""
self.api_key = value
@property
def API_KEYS(self) -> dict:
"""Upper-cased backward-compatible view of the :attr:`api_keys` mapping.
Exposes the external-service credential dict (Brave, Vultr, xAI,
ElevenLabs, Sporestack, ...) under the historical ``API_KEYS`` name that
tool code was written against, e.g. ``ctx.config.API_KEYS["sporestack"]``
in ``tools/sporestack_tools.py``. Returns the live underlying dict rather
than a copy, so the same key store is shared.
Read by various tools that look up third-party service keys; a trivial
getter with no side effects.
Returns:
dict: The :attr:`api_keys` mapping of service name to credential.
"""
return self.api_keys
@property
def configured_platforms(self) -> list[str]:
"""Return the set of enabled platform names as a lowercase list.
Scans :attr:`platforms`, keeping only entries that are ``enabled`` with a
non-empty ``type``, lowercases each, and collapses the self-bot variant
``"discord-self"`` to ``"discord"`` so callers see a single canonical
Discord platform. The fallback trio ``discord``, ``matrix``, and
``webchat`` is always unioned in for backward compatibility and tests,
so the result is a superset of what is strictly configured.
Read by ``wallet_manager.py`` and ``tools/user_variables.py`` (via
``hasattr`` guards) to decide which platforms a feature applies to. Pure
in-memory computation over the dataclass; no I/O.
Returns:
list[str]: Unique, lowercase platform identifiers (order not
guaranteed since it derives from a set).
"""
types = set()
for p in self.platforms:
if p.enabled and p.type:
t = p.type.lower()
if t == "discord-self":
t = "discord"
types.add(t)
# Ensure fallback defaults are always present for backward compatibility and testing
for fallback in ["discord", "matrix", "webchat"]:
types.add(fallback)
return list(types)
[docs]
def resolved_journal_units(self) -> list[str]:
"""systemd units the admin journal tails, honouring config then the fleet.
When :attr:`journal_systemd_units` is set (via YAML) that explicit list
wins verbatim. Otherwise the units are derived from the canonical fleet
definition — ``core.control_ops.SERVICE_TIERS`` resolved through
``unit_name_for`` — so the default automatically tracks the live
microservices (``stargazer-gateway``, ``stargazer-inference``,
``stargazer-agents``, ``stargazer-consolidation``, ``stargazer-web`` with
the default prefix) and respects this deployment's ``control_unit_prefix``
/ ``control_unit_names`` overrides. The old monolithic ``stargazer`` /
``stargazer-swarm`` units no longer exist post-T3.
Read by the admin journal WebSocket in ``web/observability_routes.py`` to
pick ``journalctl -u`` targets. Pure in-memory computation; no I/O. The
import is deferred to keep ``config`` free of a load-time dependency on
``core.control_ops``.
Returns:
list[str]: Ordered systemd unit names to tail (never empty).
"""
if self.journal_systemd_units:
return list(self.journal_systemd_units)
from core.control_ops import fleet_units
return fleet_units(self)
# --- LLM quality filter ---
llm_filter_enabled: bool = False
# --- RLHF anti-pattern guardrail (inverted safety) ---
rlhf_guardrail_enabled: bool = True
# --- Ego ablation global killswitch ---
ego_ablation_enabled: bool = False
# --- Proactive responses ---
proactive_enabled: bool = False
proactive_default_frequency: float = 0.05
proactive_triage_enabled: bool = True
proactive_triage_model: str = "gemini-3.1-flash-lite"
# --- Channel heartbeat (background MRU nudge, flash model) ---
channel_heartbeat_enabled: bool = True
channel_heartbeat_interval_min_s: float = 900.0
channel_heartbeat_interval_max_s: float = 2700.0
channel_heartbeat_tick_s: float = 45.0
channel_heartbeat_max_channels: int = 10
channel_heartbeat_concurrency: int = 3
channel_heartbeat_model: str = "gemini-3-flash"
# --- Background scheduler: periodic OpenRouter chat/completion tasks ---
background_scheduler_chat_llm_enabled: bool = True
background_scheduler_log_rag_ingest_enabled: bool = False
legacy_kg_extraction: bool = False
"""Whether the legacy auto_kg_extraction task is active. Disabled by default
since Knowledge Anchoring does the same thing, but in real time."""
# --- NCM: full disable per channel (platform:channel_id); see feature_toggles ---
ncm_fully_disabled_channels: frozenset[str] = field(
default_factory=lambda: frozenset(DEFAULT_NCM_FULLY_DISABLED_CHANNELS),
)
# --- Message batching ---
batch_window: float = 5.0
max_batch_size: int = 10
# --- Threadweave ---
dna_vault_path: str = "data/dna_vault"
# --- API key encryption (per-user keys in SQLite) ---
api_key_encryption_db_path: str = "data/api_key_encryption_keys.db"
# --- Media cache ---
media_cache_dir: str = "media_cache"
media_cache_max_mb: int = 500
media_download_retry_attempts: int = 3
"""Total attempts (1 = no retry) for a CDN attachment/emoji download before
giving up. A transient Discord/Matrix CDN blip otherwise drops the
attachment for that one message; bounded retry closes that one-shot loss."""
# --- Visual Memory (cross-channel image pattern recognition) --- # 👀
visual_memory_enabled: bool = True
"""Master toggle for the visual memory graph. When False, no image
processing occurs and no VisualEntity nodes are created."""
visual_memory_face_threshold: float = 0.65
"""Cosine similarity threshold for face identity matching. ArcFace
embeddings typically score 0.5-0.7 for same-person matches; 0.65
is conservative enough to avoid false positives on CPU models."""
visual_memory_object_threshold: float = 0.75
"""Cosine similarity threshold for object/scene matching via SigLIP.
Higher than face threshold because object identity is less precise."""
visual_memory_insightface_model: str = "buffalo_sc"
"""InsightFace model pack. ``buffalo_sc`` is CPU-friendly (small);
``buffalo_l`` is more accurate but heavier. Both use ArcFace 512d."""
visual_memory_siglip_model: str = "google/siglip-so400m-patch14-224"
"""HuggingFace SigLIP model for general object/scene embeddings.
``so400m-patch14-224`` is the medium variant; CPU-manageable."""
visual_memory_max_entities_per_image: int = 10
"""Max number of face/object entities to extract from a single image."""
visual_memory_cache_ttl_seconds: int = 300
"""How long (seconds) recognition results stay in Redis for context
injection. 5 minutes covers the typical conversation window."""
visual_memory_min_sightings_to_report: int = 2
"""Minimum sighting count before Star reports recognizing an entity.
Set to 2 so one-off sightings aren't mentioned (avoids noise)."""
visual_memory_text_density_threshold: float = 0.15
"""Canny edge density ratio above which an image is considered
text-heavy (screenshots, code, documents) and skipped for visual
entity recognition. Combined with color variance and histogram
bimodality checks to avoid false positives on textured photos.
Range 0.0-1.0; lower = more aggressive filtering."""
# --- Per-user LLM sandboxes (filesystem + Tor-enforced code execution) ---
user_sandboxes_dir: str = "data/user_sandboxes"
"""Root directory for ``{user_id}/workspace`` sandbox trees."""
user_sandbox_quota_bytes: int = 1024**3
"""Max total size (bytes) of regular files per user under ``user_sandboxes_dir/{user_id}/``."""
user_sandbox_quota_mode: str = "auto"
"""``python`` = app-level byte counting; ``loopback`` = per-user ext4 loop image (needs Linux+root); ``auto`` = try loopback then Python."""
user_sandbox_loopback_dir: str = "data/user_sandbox_loopback"
"""Sparse disk images for loopback sandboxes (see docs/USER_SANDBOX_TOR.md)."""
user_sandbox_loopback_index_path: str = "data/user_sandbox_loopback_index.json"
"""JSON map of user_id to image and mount paths for remount-after-reboot."""
user_sandbox_remount_on_startup: bool = True
"""If True, attempt to remount loopback sandboxes when the bot starts (Linux + root)."""
tor_gateway_container: str = "stargazer-tor-gateway"
"""Docker container name for ``docker run --network container:...`` Tor sidecar."""
sandbox_curl_image: str = "curlimages/curl:8.11.1"
"""Image for HTTPS downloads into a user sandbox over the Tor netns."""
# --- Emoji resolution ---
resolve_emojis_as_images: bool = True
max_emojis_per_message: int = 5
# --- Web GUI ---
web_host: str = "127.0.0.1"
web_port: int = 8080
redis_platform_admin_host: str = "127.0.0.1"
redis_platform_admin_port: int = 8081
# Explicit systemd units for the admin journal WebSocket (journalctl -u …).
# Empty (the default) means "tail every fleet tier", resolved at read time by
# :meth:`resolved_journal_units` from ``core.control_ops.SERVICE_TIERS`` so the
# default tracks the live microservices (stargazer-gateway / -inference /
# -agents / -consolidation / -web) and honours any ``control_unit_prefix`` /
# ``control_unit_names`` overrides instead of hardcoding stale unit names.
journal_systemd_units: list[str] = field(default_factory=list)
# --- Admin user IDs (bypass privilege escalation, access admin UI) ---
admin_user_ids: list[str] = field(default_factory=list)
# --- Shell tool legacy allowlist (see tools/shell_tool.py); prefer UNSANDBOXED_EXEC ---
shell_authorized_user_ids: list[str] = field(default_factory=list)
# --- Admin ops: service names and repo path (used by !bot_restart / !proxy_restart / !bot_pull) ---
bot_service_name: str = "stargazer-inference"
"""systemd unit for the back-compat ``!bot_restart`` (now an alias for
``!restart_inference``). The old monolithic ``stargazer`` unit no longer
exists post-T3; the inference tier is the closest analogue."""
proxy_service_name: str = "gemini-cli-proxy"
"""systemd service name for ``!proxy_restart`` (default: ``gemini-cli-proxy``)."""
bot_repo_path: str = ""
"""Absolute path to the bot git repo for ``!bot_pull``. Empty = current working directory."""
# --- Cluster control-ops (fleet-wide !restart_* / !bot_pull over sg:control:ops) ---
control_unit_prefix: str = "stargazer-"
"""Prefix for deriving a service's systemd unit name: ``{prefix}{service_name}``
(e.g. ``stargazer-gateway``). Overridden per-service by ``control_unit_names``."""
control_unit_names: dict[str, str] = field(default_factory=dict)
"""Optional explicit map of ``service_name -> systemd unit`` for non-standard
deployments. Falls back to ``control_unit_prefix + service_name`` when unset."""
control_proxy_handler_service: str = "gateway"
"""Which service tier issues ``!proxy_restart`` so the external proxy unit is
restarted exactly once (not once per service that hears the control op)."""
control_gateway_restart_grace: float = 8.0
"""Seconds the gateway waits before self-restarting, so ACKs/replies flush and
it restarts strictly LAST in a fleet-wide restart."""
control_service_restart_grace: float = 2.0
"""Seconds a non-gateway service waits before self-restarting (lets it RPUSH
its ack to the reply channel first)."""
control_reply_timeout: float = 3.0
"""How long the gateway publisher aggregates per-service acks before reporting."""
control_pull_lock_ttl: int = 90
"""TTL (seconds) for the per-host/repo ``sg:control:pull`` dedupe lock."""
command_sync_cooldown_seconds: int = 21600
"""Min seconds between gateway slash-command tree syncs (Redis NX cooldown).
Boot-time auto-sync re-registers commands without tripping Discord's
rate-limit penalty box. Default 6h. Manual ``!sync_tree`` always bypasses it."""
# --- Prompt / pre-inference timeouts (seconds; 0 = no cap where supported) ---
prompt_context_build_timeout_seconds: float = 900.0
preinference_context_shield_timeout_seconds: float = 600.0
# Soft deadline for parallel pre-inference asyncio.wait (classify, skills, context, …).
preinference_gather_timeout_seconds: float = 100.0
batch_preprocess_shield_timeout_seconds: float = 30.0
# How long inference waits for a media-bearing user turn (attachment OR an
# image URL in the text) to finish download+encode and reach history before
# generating — else the model answers blind and the image surfaces only on
# the next turn. Must exceed url_utils.image's HEAD+GET budget (~23s).
media_preprocess_shield_timeout_seconds: float = 30.0
redis_stream_maxlen: int = 100_000
# --- Per-channel semantic recall (Redis KNN + zset ±neighbors, XML on user message) ---
channel_semantic_recall_enabled: bool = True
channel_semantic_recall_days: int = 30
channel_semantic_recall_top_k: int = 2
channel_semantic_recall_oversample_factor: int = 5
channel_semantic_recall_neighbor_before: int = 5
channel_semantic_recall_neighbor_after: int = 5
channel_semantic_recall_max_total_chars: int = 12000
channel_semantic_recall_max_window_chars: int = 4000
channel_semantic_recall_min_similarity: float = 0.65
channel_semantic_recall_timeout_seconds: float = 12.0
# Cross-channel recall: KNN over the speaker's most-recently-used
# channels (excluding the current one). Privacy-by-construction:
# MRU is derived from messages the user has personally sent.
channel_semantic_recall_cross_channel_enabled: bool = True
channel_semantic_recall_cross_channel_top_k_channels: int = 10
channel_semantic_recall_cross_channel_top_k_hits: int = 2
channel_semantic_recall_cross_channel_min_similarity: float = 0.78
channel_semantic_recall_cross_channel_max_total_chars: int = 6000
channel_semantic_recall_cross_channel_max_window_chars: int = 2000
channel_semantic_recall_cross_channel_neighbor_before: int = 3
channel_semantic_recall_cross_channel_neighbor_after: int = 3
channel_semantic_recall_cross_channel_lookback_messages: int = 300
# --- Webhook ---
webhook_secret: str = ""
# --- Admin panel canonical URL ---
admin_panel_base_url: str = ""
session_cookie_domain: str = ""
# --- Discord OAuth2 (web UI authentication) ---
discord_oauth_client_id: str = ""
discord_oauth_client_secret: str = ""
discord_oauth_redirect_uri: str = ""
# --- OAuth2 token management (per-user service connections) ---
oauth_encryption_key: str = ""
oauth_base_url: str = ""
oauth_providers: dict[str, dict[str, Any]] = field(default_factory=dict)
# --- Per-platform configs ---
platforms: list[PlatformConfig] = field(default_factory=list)
# --- Dynamic Logging configuration ---
log_level: str = "INFO"
structured_logging: bool = False
# --- Agent Skills (vector catalog + activate_skill) ---
skills_enabled: bool = False
skills_corpus_roots: list[str] = field(default_factory=list)
skills_index_db: str = "data/skills_index.db"
skills_top_k: int = 12
skills_similarity_threshold: float = 0.12
skills_catalog_max_chars: int = 4000
# --- mcpo: MCP-to-OpenAPI proxy (Docker Compose service + tools/mcpo_proxy_tools.py) ---
mcpo_enabled: bool = False
mcpo_base_url: str = ""
mcpo_api_key: str = ""
mcpo_config_path: str = "data/mcpo_servers.json"
# --- Dangerous-command embedding hint (RediSearch KNN on user message) ---
dangerous_command_warning_enabled: bool = True
# On Redis / index / KNN errors: "open" = fail-open (no suffix), "warn"/"closed" = inject warning suffix.
dangerous_command_guard_fail_mode: str = "open"
dangerous_command_similarity_threshold: float = 0.8
# Min gap (danger_sim - benign_sim) when idx:benign_tech has docs; 0 => require d > b only.
dangerous_command_benign_margin: float = 0.0
# --- Tool vector classifier tuning ---
tool_similarity_threshold: float = 0.30
tool_top_k: int = 15
tool_strategy_force_threshold: float = 0.80
tool_strategy_optional_threshold: float = 0.30
tool_group_expansion_threshold: float = 0.55
# Stricter floor for ``browser_*`` tools (rarely used; noisy retrieval).
tool_browser_similarity_threshold: float = 0.60
# --- Starwiki (OAuth-gated markdown wikis + bot tools + per-wiki RAG) ---
starwiki_enabled: bool = False
starwiki_root: str = "data/starwiki"
starwiki_worker_model: str = "gemini-3-flash"
starwiki_lint_interval_minutes: int = 60
starwiki_git_author: str = "starwiki-bot"
starwiki_git_author_email: str = "starwiki@local"
starwiki_max_source_mb: int = 25
starwiki_ingest_concurrency: int = 2
starwiki_allow_public_wiki_edit: bool = True
starwiki_rag_auto_index: bool = True
starwiki_scheduled_lint_includes_public: bool = True
# --- Attachment guard (unreadable text inlined from user attachments) ---
attachment_guard_unreadable_truncation_enabled: bool = True
attachment_guard_unreadable_truncation_max_chars: int = 2000
attachment_guard_unreadable_truncation_ascii_threshold: float = 0.05
attachment_guard_unreadable_truncation_lang_confidence: float = 0.5
attachment_guard_unreadable_truncation_sample_chars: int = 4096
# Min share of Unicode letters (L*); below this, treat as emoji/symbol spam.
attachment_guard_unreadable_truncation_min_letter_ratio: float = 0.05
# Near-max Shannon entropy catches encrypted/base64/hex/random ASCII payloads.
attachment_guard_unreadable_truncation_entropy_normalized_threshold: float = 0.98
attachment_guard_unreadable_truncation_entropy_min_bits_per_char: float = 3.9
attachment_guard_unreadable_truncation_entropy_min_chars: int = 1024
# ---- Legacy top-level Matrix fields (kept for backward compat) ----
homeserver: str = "https://matrix.org"
user_id: str = ""
password: str = ""
store_path: str = "nio_store"
credentials_file: str = "credentials.json"
@classmethod
def _parse_kg_config(cls, data: dict) -> dict:
"""Flatten the ``knowledge_graph`` YAML block into ``kg_*`` field kwargs.
Pulls the nested ``knowledge_graph`` mapping out of the parsed YAML and
coerces each option (hop limits, seed top-k and similarity thresholds,
dynamic-threshold relaxation, per-category candidate caps, edge weights,
decay factors, extraction flags, and the channel hints / full-memory user
lists) into the flat, typed ``kg_*`` keyword arguments the :class:`Config`
constructor expects. Missing keys fall back to the class-level defaults
on *cls*, and non-dict sub-values are defensively normalised to empty
dicts/lists.
A pure data-shaping helper invoked once by :meth:`Config.load` (spread as
``**cls._parse_kg_config(data)`` into the :class:`Config` constructor);
it reads only the in-memory *data* dict and touches no
Redis/KG/LLM/HTTP collaborator.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``kg_*`` field name, ready to splat
into the :class:`Config` constructor.
"""
kg = data.get("knowledge_graph", {})
if not isinstance(kg, dict):
kg = {}
hints_raw = kg.get("extraction_channel_hints", {})
if not isinstance(hints_raw, dict):
hints_raw = {}
hints: dict[str, str] = {}
for k, v in hints_raw.items():
if k is None or v is None:
continue
hints[str(k)] = str(v)
full_user_ids_raw = kg.get(
"full_user_memory_ids",
list(DEFAULT_KG_FULL_USER_MEMORY_IDS),
)
if isinstance(full_user_ids_raw, list):
full_user_ids = [
str(uid).strip() for uid in full_user_ids_raw if str(uid).strip()
]
else:
full_user_ids = list(DEFAULT_KG_FULL_USER_MEMORY_IDS)
return {
"kg_extraction_model": kg.get("extraction_model", cls.kg_extraction_model),
"kg_max_hops": int(kg.get("max_hops", cls.kg_max_hops)),
"kg_seed_top_k": int(kg.get("seed_top_k", cls.kg_seed_top_k)),
"kg_seed_limit": int(kg.get("seed_limit", cls.kg_seed_limit)),
"kg_seed_similarity_threshold": float(
kg.get("seed_similarity_threshold", cls.kg_seed_similarity_threshold),
),
"kg_seed_dynamic_threshold_enabled": bool(
kg.get(
"seed_dynamic_threshold_enabled",
cls.kg_seed_dynamic_threshold_enabled,
),
),
"kg_seed_dynamic_threshold_target_ratio": float(
kg.get(
"seed_dynamic_threshold_target_ratio",
cls.kg_seed_dynamic_threshold_target_ratio,
),
),
"kg_seed_dynamic_threshold_min": float(
kg.get(
"seed_dynamic_threshold_min",
cls.kg_seed_dynamic_threshold_min,
),
),
"kg_seed_dynamic_threshold_min_stored": int(
kg.get(
"seed_dynamic_threshold_min_stored",
cls.kg_seed_dynamic_threshold_min_stored,
),
),
"kg_full_user_memory_ids": full_user_ids,
"kg_user_seed_min": int(
kg.get("user_seed_min", cls.kg_user_seed_min),
),
"kg_user_candidate_limit": int(
kg.get("user_candidate_limit", cls.kg_user_candidate_limit),
),
"kg_lore_candidate_limit": int(
kg.get("lore_candidate_limit", cls.kg_lore_candidate_limit),
),
"kg_lore_seed_min": int(
kg.get("lore_seed_min", cls.kg_lore_seed_min),
),
"kg_meta_candidate_limit": int(
kg.get("meta_candidate_limit", cls.kg_meta_candidate_limit),
),
"kg_meta_seed_min": int(
kg.get("meta_seed_min", cls.kg_meta_seed_min),
),
"kg_recent_speaker_limit": int(
kg.get("recent_speaker_limit", cls.kg_recent_speaker_limit),
),
"kg_min_edge_weight": float(
kg.get("min_edge_weight", cls.kg_min_edge_weight)
),
"kg_default_edge_weight": float(
kg.get("default_edge_weight", cls.kg_default_edge_weight),
),
"kg_retrieval_hop_decay": float(
kg.get("retrieval_hop_decay", cls.kg_retrieval_hop_decay),
),
"kg_expansion_neighbor_limit": int(
kg.get("expansion_neighbor_limit", cls.kg_expansion_neighbor_limit),
),
"kg_max_context_entities": int(
kg.get("max_context_entities", cls.kg_max_context_entities)
),
"kg_entity_dedup_threshold": float(
kg.get("entity_dedup_threshold", cls.kg_entity_dedup_threshold)
),
"kg_relationship_decay_factor": float(
kg.get("relationship_decay_factor", cls.kg_relationship_decay_factor)
),
"kg_per_message_extraction": bool(
kg.get("per_message_extraction", cls.kg_per_message_extraction)
),
"kg_min_message_length": int(
kg.get("min_message_length_for_extraction", cls.kg_min_message_length)
),
"kg_per_user_extraction_limit": int(
kg.get("per_user_extraction_limit", cls.kg_per_user_extraction_limit)
),
"kg_extraction_channel_hints": hints,
}
@classmethod
def _parse_mementropic_config(cls, data: dict) -> dict:
"""Flatten the ``mementropic`` YAML block into ``mementropic_*`` kwargs.
Reads the nested ``mementropic`` mapping (the NCM ledger plus late-fusion
and reconsolidation tuning) from the parsed YAML and coerces each option —
late-fusion toggle, the semantic/resonance blend weights, ledger size cap,
and the reconsolidation learning-rate / step / entity bounds — into the
flat, typed ``mementropic_*`` keyword arguments the :class:`Config`
constructor expects, falling back to *cls* defaults for absent keys.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_mementropic_config(data)``); it reads only the in-memory
*data* dict and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``mementropic_*`` field name for the
:class:`Config` constructor.
"""
m = data.get("mementropic", {})
if not isinstance(m, dict):
m = {}
return {
"mementropic_late_fusion_enabled": bool(
m.get("late_fusion_enabled", cls.mementropic_late_fusion_enabled),
),
"mementropic_semantic_weight": float(
m.get("semantic_weight", cls.mementropic_semantic_weight),
),
"mementropic_resonance_weight": float(
m.get("resonance_weight", cls.mementropic_resonance_weight),
),
"mementropic_ledger_max_entries": int(
m.get("ledger_max_entries", cls.mementropic_ledger_max_entries),
),
"mementropic_reconsolidation_enabled": bool(
m.get(
"reconsolidation_enabled", cls.mementropic_reconsolidation_enabled
),
),
"mementropic_reconsolidation_learning_rate": float(
m.get(
"reconsolidation_learning_rate",
cls.mementropic_reconsolidation_learning_rate,
),
),
"mementropic_reconsolidation_max_step": float(
m.get(
"reconsolidation_max_step",
cls.mementropic_reconsolidation_max_step,
),
),
"mementropic_reconsolidation_max_entities": int(
m.get(
"reconsolidation_max_entities",
cls.mementropic_reconsolidation_max_entities,
),
),
}
@classmethod
def _parse_persona_pref_config(cls, data: dict) -> dict:
"""Flatten the ``persona_preferences`` YAML block into ``persona_*`` kwargs.
Reads the nested ``persona_preferences`` mapping (the persona preference
memory subsystem) from the parsed YAML and coerces each option — the
master and extraction toggles, extraction rate limit and minimum response
length, injection count/character budgets, and the base persona id — into
the flat, typed ``persona_*`` keyword arguments the :class:`Config`
constructor expects, defaulting to *cls* values for missing keys.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_persona_pref_config(data)``); it reads only the in-memory
*data* dict and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``persona_*`` field name for the
:class:`Config` constructor.
"""
pp = data.get("persona_preferences", {})
if not isinstance(pp, dict):
pp = {}
return {
"persona_preferences_enabled": bool(
pp.get("enabled", cls.persona_preferences_enabled),
),
"persona_pref_extraction_enabled": bool(
pp.get("extraction_enabled", cls.persona_pref_extraction_enabled),
),
"persona_pref_extraction_rate_limit_seconds": int(
pp.get(
"extraction_rate_limit_seconds",
cls.persona_pref_extraction_rate_limit_seconds,
),
),
"persona_pref_extraction_min_response_length": int(
pp.get(
"extraction_min_response_length",
cls.persona_pref_extraction_min_response_length,
),
),
"persona_pref_injection_max_count": int(
pp.get("injection_max_count", cls.persona_pref_injection_max_count),
),
"persona_pref_injection_max_chars": int(
pp.get("injection_max_chars", cls.persona_pref_injection_max_chars),
),
"persona_pref_base_persona_id": str(
pp.get("base_persona_id", cls.persona_pref_base_persona_id),
),
}
@classmethod
def _parse_starwiki_config(cls, data: dict) -> dict:
"""Flatten the ``starwiki`` YAML block into ``starwiki_*`` field kwargs.
Reads the nested ``starwiki`` mapping (the OAuth-gated markdown wiki
subsystem) from the parsed YAML and coerces each option — the enable
toggle, on-disk root, lint worker model and interval, git author identity,
source size cap, ingest concurrency, public-edit and RAG auto-index flags,
and whether scheduled lints include public wikis — into the flat, typed
``starwiki_*`` keyword arguments the :class:`Config` constructor expects,
defaulting to *cls* values for absent keys.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_starwiki_config(data)``); it reads only the in-memory
*data* dict and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``starwiki_*`` field name for the
:class:`Config` constructor.
"""
sw = data.get("starwiki", {})
if not isinstance(sw, dict):
sw = {}
return {
"starwiki_enabled": bool(sw.get("enabled", cls.starwiki_enabled)),
"starwiki_root": str(sw.get("root", cls.starwiki_root)),
"starwiki_worker_model": str(
sw.get("worker_model", cls.starwiki_worker_model),
),
"starwiki_lint_interval_minutes": int(
sw.get("lint_interval_minutes", cls.starwiki_lint_interval_minutes),
),
"starwiki_git_author": str(
sw.get("git_author", cls.starwiki_git_author),
),
"starwiki_git_author_email": str(
sw.get("git_author_email", cls.starwiki_git_author_email),
),
"starwiki_max_source_mb": int(
sw.get("max_source_mb", cls.starwiki_max_source_mb),
),
"starwiki_ingest_concurrency": int(
sw.get("ingest_concurrency", cls.starwiki_ingest_concurrency),
),
"starwiki_allow_public_wiki_edit": bool(
sw.get("allow_public_wiki_edit", cls.starwiki_allow_public_wiki_edit),
),
"starwiki_rag_auto_index": bool(
sw.get("rag_auto_index", cls.starwiki_rag_auto_index),
),
"starwiki_scheduled_lint_includes_public": bool(
sw.get(
"scheduled_lint_includes_public",
cls.starwiki_scheduled_lint_includes_public,
),
),
}
@classmethod
def _parse_vector_store_config(cls, data: dict) -> dict:
"""Flatten the ``vector_store`` YAML block into ``vector_pg_*`` kwargs.
Reads the nested ``vector_store`` mapping (the Postgres + pgvector backend
for all RAG / memory vector storage) from the parsed YAML and coerces each
option — the full DSN override plus the discrete host, port, database,
user, password, sslmode, and connection-pool min/max sizes — into the
flat, typed ``vector_pg_*`` keyword arguments the :class:`Config`
constructor expects, defaulting to *cls* values for missing keys.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_vector_store_config(data)``); it reads only the in-memory
*data* dict and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``vector_pg_*`` field name for the
:class:`Config` constructor.
"""
vs = data.get("vector_store", {})
if not isinstance(vs, dict):
vs = {}
return {
"vector_pg_dsn": str(vs.get("dsn", cls.vector_pg_dsn)),
"vector_pg_host": str(vs.get("host", cls.vector_pg_host)),
"vector_pg_port": int(vs.get("port", cls.vector_pg_port)),
"vector_pg_database": str(vs.get("database", cls.vector_pg_database)),
"vector_pg_user": str(vs.get("user", cls.vector_pg_user)),
"vector_pg_password": str(vs.get("password", cls.vector_pg_password)),
"vector_pg_sslmode": str(vs.get("sslmode", cls.vector_pg_sslmode)),
"vector_pg_min_size": int(vs.get("min_size", cls.vector_pg_min_size)),
"vector_pg_max_size": int(vs.get("max_size", cls.vector_pg_max_size)),
}
@classmethod
def _parse_attachment_guard_config(cls, data: dict) -> dict:
"""Flatten ``attachment_guard.unreadable_truncation`` YAML into kwargs.
Drills into the nested ``attachment_guard`` then
``unreadable_truncation`` mappings (the guard that truncates unreadable
text inlined from user attachments) and coerces each option — the enable
toggle, max/sample character caps, ASCII and language-confidence
thresholds, minimum letter ratio, and the Shannon-entropy gating
parameters that catch encrypted/base64/random payloads — into the flat,
typed ``attachment_guard_unreadable_truncation_*`` keyword arguments the
:class:`Config` constructor expects, defaulting to *cls* values for absent
keys.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_attachment_guard_config(data)``); it reads only the
in-memory *data* dict and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by
``attachment_guard_unreadable_truncation_*`` field name for the
:class:`Config` constructor.
"""
ag = data.get("attachment_guard", {})
if not isinstance(ag, dict):
ag = {}
ut = ag.get("unreadable_truncation", {})
if not isinstance(ut, dict):
ut = {}
return {
"attachment_guard_unreadable_truncation_enabled": bool(
ut.get("enabled", cls.attachment_guard_unreadable_truncation_enabled),
),
"attachment_guard_unreadable_truncation_max_chars": int(
ut.get(
"max_chars", cls.attachment_guard_unreadable_truncation_max_chars
),
),
"attachment_guard_unreadable_truncation_ascii_threshold": float(
ut.get(
"ascii_threshold",
cls.attachment_guard_unreadable_truncation_ascii_threshold,
),
),
"attachment_guard_unreadable_truncation_lang_confidence": float(
ut.get(
"lang_confidence",
cls.attachment_guard_unreadable_truncation_lang_confidence,
),
),
"attachment_guard_unreadable_truncation_sample_chars": int(
ut.get(
"sample_chars",
cls.attachment_guard_unreadable_truncation_sample_chars,
),
),
"attachment_guard_unreadable_truncation_min_letter_ratio": float(
ut.get(
"min_letter_ratio",
cls.attachment_guard_unreadable_truncation_min_letter_ratio,
),
),
"attachment_guard_unreadable_truncation_entropy_normalized_threshold": float(
ut.get(
"entropy_normalized_threshold",
cls.attachment_guard_unreadable_truncation_entropy_normalized_threshold,
),
),
"attachment_guard_unreadable_truncation_entropy_min_bits_per_char": float(
ut.get(
"entropy_min_bits_per_char",
cls.attachment_guard_unreadable_truncation_entropy_min_bits_per_char,
),
),
"attachment_guard_unreadable_truncation_entropy_min_chars": int(
ut.get(
"entropy_min_chars",
cls.attachment_guard_unreadable_truncation_entropy_min_chars,
),
),
}
@classmethod
def _parse_knowledge_anchoring_config(cls, data: dict) -> dict:
"""Flatten the ``knowledge_anchoring`` YAML block into ``ka_*`` kwargs.
Parses the multi-level ``knowledge_anchoring`` mapping for the Limbic
Anchoring subsystem — its ``worker``, ``gates`` (including the catch-up
relaxation set), ``sunset``, ``gc``, and ``dedup`` sub-sections — and
coerces every option into the flat, typed ``ka_*`` keyword arguments the
:class:`Config` constructor expects, defaulting to *cls* values for absent
keys. The significant-tools whitelist falls back to a production-safe
default list when the YAML value is missing or not a list, and the sunset
window bounds are validated so ``window_end_days`` is strictly less than
``window_start_days``.
A pure data-shaping helper called once by :meth:`Config.load` (spread as
``**cls._parse_knowledge_anchoring_config(data)``) and exercised directly
by ``tests/test_extraction_llm_timeout.py`` and
``tests/test_catchup_gate.py``. It reads only the in-memory *data* dict
and performs no I/O.
Args:
data (dict): The full parsed ``config.yaml`` mapping.
Returns:
dict: Keyword arguments keyed by ``ka_*`` field name for the
:class:`Config` constructor.
Raises:
ValueError: If ``sunset.window_end_days`` is greater than or equal to
``sunset.window_start_days`` (a degenerate sunset window).
"""
ka = data.get("knowledge_anchoring", {})
if not isinstance(ka, dict):
ka = {}
w = ka.get("worker", {})
if not isinstance(w, dict):
w = {}
g = ka.get("gates", {})
if not isinstance(g, dict):
g = {}
s = ka.get("sunset", {})
if not isinstance(s, dict):
s = {}
gc = ka.get("gc", {})
if not isinstance(gc, dict):
gc = {}
dedup = ka.get("dedup", {})
if not isinstance(dedup, dict):
dedup = {}
org = ka.get("org", {})
if not isinstance(org, dict):
org = {}
# Significant tools: YAML list or fallback to production-safe default.
_default_tools = [
"search_web",
"read_file",
"kg_search_entities",
"unsandboxed_python_tool",
]
raw_tools = g.get("significant_tools", None)
if isinstance(raw_tools, list):
significant_tools = [str(t) for t in raw_tools]
else:
significant_tools = _default_tools
# Guard: window_end_days must be strictly less than window_start_days.
_start = int(s.get("window_start_days", cls.ka_sunset_window_start_days))
_end = int(s.get("window_end_days", cls.ka_sunset_window_end_days))
if _end >= _start:
raise ValueError(
f"knowledge_anchoring.sunset.window_end_days ({_end}) must be "
f"less than window_start_days ({_start})"
)
return {
"ka_enabled": bool(ka.get("enabled", cls.ka_enabled)),
# Worker
"ka_batch_size": int(w.get("batch_size", cls.ka_batch_size)),
"ka_sleep_interval_seconds": int(
w.get("sleep_interval_seconds", cls.ka_sleep_interval_seconds)
),
"ka_max_anchors_per_channel_per_day": int(
w.get(
"max_anchors_per_channel_per_day",
cls.ka_max_anchors_per_channel_per_day,
)
),
"ka_dlq_retry_limit": int(w.get("dlq_retry_limit", cls.ka_dlq_retry_limit)),
"ka_backpressure_threshold": int(
w.get("backpressure_threshold", cls.ka_backpressure_threshold)
),
"ka_backpressure_recovery": int(
w.get("backpressure_recovery", cls.ka_backpressure_recovery)
),
"ka_batch_min_size": int(w.get("batch_min_size", cls.ka_batch_min_size)),
"ka_epoch_size": int(w.get("epoch_size", cls.ka_epoch_size)),
"ka_fast_path_prefix": str(
w.get("fast_path_prefix", cls.ka_fast_path_prefix)
),
# Gates
"ka_length_minimum": int(g.get("length_minimum", cls.ka_length_minimum)),
"ka_similarity_familiar": float(
g.get("similarity_familiar", cls.ka_similarity_familiar)
),
"ka_similarity_anomaly": float(
g.get("similarity_anomaly", cls.ka_similarity_anomaly)
),
"ka_density_threshold": float(
g.get("density_threshold", cls.ka_density_threshold)
),
"ka_tool_rate_limit_per_user_per_hour": int(
g.get(
"tool_rate_limit_per_user_per_hour",
cls.ka_tool_rate_limit_per_user_per_hour,
)
),
"ka_significant_tools": significant_tools,
# Catch-up Gate
"ka_catchup_enabled": bool(
g.get("catchup_enabled", cls.ka_catchup_enabled)
),
"ka_catchup_similarity_familiar": float(
g.get("catchup_similarity_familiar", cls.ka_catchup_similarity_familiar)
),
"ka_catchup_similarity_anomaly": float(
g.get("catchup_similarity_anomaly", cls.ka_catchup_similarity_anomaly)
),
"ka_catchup_density_threshold": float(
g.get("catchup_density_threshold", cls.ka_catchup_density_threshold)
),
"ka_catchup_max_anchors_per_day": int(
g.get("catchup_max_anchors_per_day", cls.ka_catchup_max_anchors_per_day)
),
# Sunset
"ka_sunset_interval_seconds": int(
s.get("interval_seconds", cls.ka_sunset_interval_seconds)
),
"ka_sunset_window_start_days": _start,
"ka_sunset_window_end_days": _end,
"ka_sunset_daily_summaries_per_epoch": int(
s.get(
"daily_summaries_per_epoch", cls.ka_sunset_daily_summaries_per_epoch
)
),
"ka_sunset_token_reduction_threshold": int(
s.get(
"token_reduction_threshold", cls.ka_sunset_token_reduction_threshold
)
),
"ka_sunset_hydration_similarity": float(
s.get("hydration_similarity", cls.ka_sunset_hydration_similarity)
),
# GC
"ka_gc_orphan_age_days": int(
gc.get("orphan_age_days", cls.ka_gc_orphan_age_days)
),
# Deduplication Worker
"ka_dedup_enabled": bool(dedup.get("enabled", cls.ka_dedup_enabled)),
"ka_dedup_interval_seconds": int(
dedup.get("interval_seconds", cls.ka_dedup_interval_seconds)
),
"ka_dedup_semantic_threshold": float(
dedup.get("semantic_threshold", cls.ka_dedup_semantic_threshold)
),
"ka_dedup_structural_batch_size": int(
dedup.get("structural_batch_size", cls.ka_dedup_structural_batch_size)
),
"ka_dedup_semantic_batch_size": int(
dedup.get("semantic_batch_size", cls.ka_dedup_semantic_batch_size)
),
"ka_dedup_llm_retry_limit": int(
dedup.get("llm_retry_limit", cls.ka_dedup_llm_retry_limit)
),
"ka_dedup_neighbourhood_limit": int(
dedup.get("neighbourhood_limit", cls.ka_dedup_neighbourhood_limit)
),
# Organization Worker
"ka_org_enabled": bool(org.get("enabled", cls.ka_org_enabled)),
"ka_org_interval_seconds": int(
org.get("interval_seconds", cls.ka_org_interval_seconds)
),
"ka_org_hub_degree_threshold": int(
org.get("hub_degree_threshold", cls.ka_org_hub_degree_threshold)
),
"ka_org_max_hubs_per_cycle": int(
org.get("max_hubs_per_cycle", cls.ka_org_max_hubs_per_cycle)
),
# Worker HTTP
"ka_extraction_http_timeout_seconds": float(
w.get(
"extraction_http_timeout_seconds",
cls.ka_extraction_http_timeout_seconds,
)
),
}
[docs]
@classmethod
def load(cls, path: str | Path = "config.yaml") -> "Config":
"""Build a fully-resolved :class:`Config` from YAML plus env overrides.
The primary configuration entry point for the whole bot. It reads *path*
(default ``config.yaml``) if present, deep-merges an optional
``data/skills_runtime.yaml`` overlay via :func:`_deep_merge_dict`, fans
the nested sections out through the ``_parse_*`` helpers (knowledge graph,
mementropic, persona prefs, starwiki, vector store, attachment guard,
knowledge anchoring), normalises the platform list / legacy Matrix fields
/ tool permissions / API-key and admin-id lists, constructs the
:class:`Config`, and then applies typed environment-variable overrides
from ``env_map`` (coercing to the field's existing bool/int/float/str
type). Finally it folds in a ``DISCORD_TOKEN`` and per-provider OAuth
credentials from the environment, anchors the Redis mTLS certificate
paths and the mcpo config path relative to the config file's directory,
and warns when ``webhook_secret`` is empty while bound to a public host.
Reads the filesystem (the YAML files and, via ``Path.resolve``, the cert
paths) and ``os.environ``; it calls ``user_llm_config.sanitize_llm_http_url``
on the LLM base URL but opens no Redis/KG/LLM/HTTP connection itself.
Called at startup by every service and script that needs configuration —
``inference_main.py``, ``agents_main.py``, ``consolidation_main.py``,
``background_tasks.py``, ``response_postprocessor.py``, the KG migration
scripts, and others — typically as a bare ``Config.load()``.
Args:
path (str | Path): Path to the YAML config file. Defaults to
``"config.yaml"``; a missing file is tolerated and yields a
defaults-only configuration.
Returns:
Config: A fully populated, env-overridden configuration instance.
Raises:
ValueError: Propagated from :meth:`_parse_knowledge_anchoring_config`
when the sunset window bounds are degenerate.
"""
data: dict = {}
config_path = Path(path)
if config_path.exists():
with open(config_path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
# Optional overlay (e.g. from scripts/skills_corpus_pipeline.py)
runtime_path = Path("data/skills_runtime.yaml")
if runtime_path.exists():
try:
with open(runtime_path, "r", encoding="utf-8") as f:
overlay = yaml.safe_load(f) or {}
if isinstance(overlay, dict):
data = _deep_merge_dict(data, overlay)
except OSError:
pass
# --- Parse platform list (new-style) --------------------------
raw_platforms: list[dict[str, Any]] = data.get("platforms", [])
platform_configs: list[PlatformConfig] = []
for raw in raw_platforms:
ptype = raw.get("type", "")
enabled = raw.get("enabled", True)
settings = {k: v for k, v in raw.items() if k not in ("type", "enabled")}
platform_configs.append(
PlatformConfig(
type=ptype,
enabled=enabled,
settings=settings,
)
)
# --- Legacy top-level Matrix fields ---------------------------
homeserver = data.get("homeserver", cls.homeserver)
user_id_val = data.get("user_id", cls.user_id)
password = data.get("password", cls.password)
store_path = data.get("store_path", cls.store_path)
credentials_file = data.get("credentials_file", cls.credentials_file)
# If no platforms list but legacy Matrix fields are present,
# synthesise a Matrix platform entry for backward compatibility.
if not platform_configs and user_id_val:
platform_configs.append(
PlatformConfig(
type="matrix",
enabled=True,
settings={
"homeserver": homeserver,
"user_id": user_id_val,
"password": password,
"store_path": store_path,
"credentials_file": credentials_file,
},
)
)
# --- Tool permissions --------------------------------------
raw_perms: dict[str, list[str]] = data.get("tool_permissions", {})
tool_permissions = (
{k: [str(uid) for uid in v] for k, v in raw_perms.items()}
if isinstance(raw_perms, dict)
else {}
)
# --- API keys -----------------------------------------------
raw_api_keys = data.get("api_keys", {})
api_keys = dict(raw_api_keys) if isinstance(raw_api_keys, dict) else {}
# Inject API keys from env vars if not in YAML # 🕷️
for env_name, key_name in (
("XAI_API_KEY", "xai"),
("ELEVENLABS_API_KEY", "elevenlabs"),
("SUNO_COOKIE", "suno_cookie"),
("WOLFRAM_ALPHA_APP_ID", "wolfram_alpha"),
("POLLINATIONS_API_KEY", "pollinations"),
):
if key_name not in api_keys:
env_val = os.environ.get(env_name, "")
if env_val:
api_keys[key_name] = env_val
# --- Admin user IDs ----------------------------------------
raw_admin_ids = data.get("admin_user_ids", [])
admin_user_ids = (
[str(uid) for uid in raw_admin_ids]
if isinstance(raw_admin_ids, list)
else []
)
raw_shell_ids = data.get("shell_authorized_user_ids")
if isinstance(raw_shell_ids, list):
shell_authorized_user_ids = [str(uid) for uid in raw_shell_ids]
else:
shell_authorized_user_ids = list(_DEFAULT_SHELL_AUTHORIZED_USER_IDS)
# --- Admin panel base URL -----------------------------------
admin_panel_base_url = str(data.get("admin_panel_base_url", "")).rstrip("/")
session_cookie_domain = str(data.get("session_cookie_domain", "")).strip()
# --- Discord OAuth2 ----------------------------------------
discord_oauth = data.get("discord_oauth", {})
if not isinstance(discord_oauth, dict):
discord_oauth = {}
# Derive redirect_uri from admin_panel_base_url when not explicit.
_oauth_redirect = discord_oauth.get("redirect_uri", "")
if not _oauth_redirect and admin_panel_base_url:
_oauth_redirect = f"{admin_panel_base_url}/auth/callback"
# --- OAuth2 token management --------------------------------
oauth_cfg = data.get("oauth", {})
if not isinstance(oauth_cfg, dict):
oauth_cfg = {}
oauth_providers_raw = oauth_cfg.get("providers", {})
if not isinstance(oauth_providers_raw, dict):
oauth_providers_raw = {}
# --- Proactive responses ----------------------------------
proactive_cfg = data.get("proactive", {})
if not isinstance(proactive_cfg, dict):
proactive_cfg = {}
# --- Channel heartbeat ------------------------------------
ch_hb = data.get("channel_heartbeat", {})
if not isinstance(ch_hb, dict):
ch_hb = {}
# --- Background scheduler (chat LLM periodic tasks) --------
bg_sched = data.get("background_scheduler", {})
if not isinstance(bg_sched, dict):
bg_sched = {}
# --- mcpo (MCP OpenAPI proxy) -----------------------------
mcpo_cfg = data.get("mcpo", {})
if not isinstance(mcpo_cfg, dict):
mcpo_cfg = {}
# --- Agent Skills -----------------------------------------
skills_cfg = data.get("skills", {})
if not isinstance(skills_cfg, dict):
skills_cfg = {}
skills_roots_raw = skills_cfg.get("corpus_roots", [])
if isinstance(skills_roots_raw, list):
skills_corpus_roots = [str(p) for p in skills_roots_raw]
else:
skills_corpus_roots = []
# --- Tool classifier tuning ----------------------------------
tc_cfg = data.get("tool_classifier", {})
if not isinstance(tc_cfg, dict):
tc_cfg = {}
resolved_api_key = (
data.get("api_key") or data.get("openrouter_api_key") or cls.api_key
)
# Create a default instance to retrieve factory-initialized default list overrides safely
default_cfg = cls()
# --- Override lists normalization ---
raw_user_override = data.get("overall_user_id_absolute_override_list")
if isinstance(raw_user_override, list):
overall_user_id_absolute_override_list = [str(x) for x in raw_user_override]
else:
overall_user_id_absolute_override_list = list(default_cfg.overall_user_id_absolute_override_list)
raw_channel_override = data.get("overall_channel_id_absolute_override_list")
if isinstance(raw_channel_override, list):
overall_channel_id_absolute_override_list = [str(x) for x in raw_channel_override]
else:
overall_channel_id_absolute_override_list = list(default_cfg.overall_channel_id_absolute_override_list)
_ncm_list = data.get("ncm_fully_disabled_channels")
if isinstance(_ncm_list, list):
ncm_fully_disabled_channels = frozenset(str(x) for x in _ncm_list)
else:
ncm_fully_disabled_channels = None # use dataclass default
if "top_p" in data:
_raw_tp = data["top_p"]
_top_p: float = cls.top_p if _raw_tp is None else float(_raw_tp)
else:
_top_p = cls.top_p
# --- Redis sentinels normalization ---
raw_sentinels = data.get("redis_sentinels") or []
if isinstance(raw_sentinels, str):
redis_sentinels = [s.strip() for s in raw_sentinels.split(",") if s.strip()]
elif isinstance(raw_sentinels, list):
redis_sentinels = [str(s).strip() for s in raw_sentinels if str(s).strip()]
else:
redis_sentinels = []
cfg = cls(
api_key=resolved_api_key,
gemini_api_key=data.get("gemini_api_key", cls.gemini_api_key),
llm_base_url=sanitize_llm_http_url(
str(data.get("llm_base_url", cls.llm_base_url)),
),
model=data.get("model", cls.model),
temperature=float(data.get("temperature", cls.temperature)),
max_tokens=int(data.get("max_tokens", cls.max_tokens)),
top_p=_top_p,
openrouter_http_connect_timeout_seconds=float(
data.get(
"openrouter_http_connect_timeout_seconds",
cls.openrouter_http_connect_timeout_seconds,
),
),
openrouter_http_read_timeout_seconds=float(
data.get(
"openrouter_http_read_timeout_seconds",
cls.openrouter_http_read_timeout_seconds,
),
),
openrouter_http_write_timeout_seconds=float(
data.get(
"openrouter_http_write_timeout_seconds",
cls.openrouter_http_write_timeout_seconds,
),
),
openrouter_http_pool_timeout_seconds=float(
data.get(
"openrouter_http_pool_timeout_seconds",
cls.openrouter_http_pool_timeout_seconds,
),
),
system_prompt_file=data.get("system_prompt_file", cls.system_prompt_file),
max_history=int(data.get("max_history", cls.max_history)),
tools_dir=data.get("tools_dir", cls.tools_dir),
tools_service_mode=str(
data.get("tools_service_mode", cls.tools_service_mode)
).strip().lower(),
tools_exec_timeout=float(
data.get("tools_exec_timeout", cls.tools_exec_timeout)
),
tools_local_fallback=bool(
data.get("tools_local_fallback", cls.tools_local_fallback)
),
tools_force_in_process=[
str(x).strip()
for x in (data.get("tools_force_in_process") or [])
if str(x).strip()
],
tools_require_session_record=bool(
data.get("tools_require_session_record", cls.tools_require_session_record)
),
tool_permissions=tool_permissions,
api_keys=api_keys,
prowlarr_base_url=str(data.get("prowlarr_base_url", cls.prowlarr_base_url)),
prowlarr_api_key=str(data.get("prowlarr_api_key", cls.prowlarr_api_key)),
cursor_api_key=str(data.get("cursor_api_key", cls.cursor_api_key)),
redis_url=data.get("redis_url", cls.redis_url),
redis_sentinels=redis_sentinels,
redis_sentinel_master=str(data.get("redis_sentinel_master", cls.redis_sentinel_master or "falkordb")),
redis_tls_cert=data.get("redis_tls_cert", cls.redis_tls_cert),
redis_tls_key=data.get("redis_tls_key", cls.redis_tls_key),
redis_tls_ca=data.get("redis_tls_ca", cls.redis_tls_ca),
redis_tls_verify_peer=bool(
data.get("redis_tls_verify_peer", cls.redis_tls_verify_peer)
),
redis_max_retries=int(
data.get("redis_max_retries", cls.redis_max_retries)
),
redis_health_check_interval=float(
data.get(
"redis_health_check_interval", cls.redis_health_check_interval
)
),
redis_socket_keepalive=bool(
data.get("redis_socket_keepalive", cls.redis_socket_keepalive)
),
redis_socket_connect_timeout=float(
data.get(
"redis_socket_connect_timeout",
cls.redis_socket_connect_timeout,
)
),
redis_socket_timeout=float(
data.get("redis_socket_timeout", cls.redis_socket_timeout)
),
embedding_model=data.get("embedding_model", cls.embedding_model),
embedding_batch_size=int(
data.get("embedding_batch_size", cls.embedding_batch_size)
),
embedding_flush_interval=float(
data.get("embedding_flush_interval", cls.embedding_flush_interval)
),
**cls._parse_vector_store_config(data),
**cls._parse_kg_config(data),
**cls._parse_mementropic_config(data),
**cls._parse_persona_pref_config(data),
**cls._parse_starwiki_config(data),
**cls._parse_attachment_guard_config(data),
**cls._parse_knowledge_anchoring_config(data),
llm_filter_enabled=bool(
data.get("llm_filter_enabled", cls.llm_filter_enabled)
),
rlhf_guardrail_enabled=bool(
data.get("rlhf_guardrail_enabled", cls.rlhf_guardrail_enabled)
),
ego_ablation_enabled=bool(
data.get("ego_ablation_enabled", cls.ego_ablation_enabled)
),
proactive_enabled=bool(proactive_cfg.get("enabled", cls.proactive_enabled)),
proactive_default_frequency=float(
proactive_cfg.get("default_frequency", cls.proactive_default_frequency)
),
proactive_triage_enabled=bool(
proactive_cfg.get("triage_enabled", cls.proactive_triage_enabled)
),
proactive_triage_model=str(
proactive_cfg.get("triage_model", cls.proactive_triage_model)
),
channel_heartbeat_enabled=bool(
ch_hb.get("enabled", cls.channel_heartbeat_enabled)
),
channel_heartbeat_interval_min_s=float(
ch_hb.get("interval_min_s", cls.channel_heartbeat_interval_min_s),
),
channel_heartbeat_interval_max_s=float(
ch_hb.get("interval_max_s", cls.channel_heartbeat_interval_max_s),
),
channel_heartbeat_tick_s=float(
ch_hb.get("tick_s", cls.channel_heartbeat_tick_s),
),
channel_heartbeat_max_channels=int(
ch_hb.get("max_channels", cls.channel_heartbeat_max_channels),
),
channel_heartbeat_concurrency=int(
ch_hb.get("concurrency", cls.channel_heartbeat_concurrency),
),
channel_heartbeat_model=str(
ch_hb.get("model", cls.channel_heartbeat_model),
),
background_scheduler_chat_llm_enabled=bool(
bg_sched.get(
"chat_llm_enabled",
cls.background_scheduler_chat_llm_enabled,
),
),
background_scheduler_log_rag_ingest_enabled=bool(
bg_sched.get(
"log_rag_ingest_enabled",
cls.background_scheduler_log_rag_ingest_enabled,
),
),
legacy_kg_extraction=bool(
data.get("legacy_kg_extraction", cls.legacy_kg_extraction)
),
**(
{}
if ncm_fully_disabled_channels is None
else {
"ncm_fully_disabled_channels": ncm_fully_disabled_channels,
}
),
batch_window=float(data.get("batch_window", cls.batch_window)),
max_batch_size=int(data.get("max_batch_size", cls.max_batch_size)),
dna_vault_path=data.get("dna_vault_path", cls.dna_vault_path),
api_key_encryption_db_path=data.get(
"api_key_encryption_db_path", cls.api_key_encryption_db_path
),
media_cache_dir=data.get("media_cache_dir", cls.media_cache_dir),
media_cache_max_mb=int(
data.get("media_cache_max_mb", cls.media_cache_max_mb)
),
media_download_retry_attempts=int(
data.get(
"media_download_retry_attempts",
cls.media_download_retry_attempts,
)
),
user_sandboxes_dir=data.get("user_sandboxes_dir", cls.user_sandboxes_dir),
user_sandbox_quota_bytes=int(
data.get("user_sandbox_quota_bytes", cls.user_sandbox_quota_bytes),
),
user_sandbox_quota_mode=str(
data.get("user_sandbox_quota_mode", cls.user_sandbox_quota_mode),
),
user_sandbox_loopback_dir=data.get(
"user_sandbox_loopback_dir", cls.user_sandbox_loopback_dir
),
user_sandbox_loopback_index_path=data.get(
"user_sandbox_loopback_index_path",
cls.user_sandbox_loopback_index_path,
),
user_sandbox_remount_on_startup=bool(
data.get(
"user_sandbox_remount_on_startup",
cls.user_sandbox_remount_on_startup,
),
),
tor_gateway_container=data.get(
"tor_gateway_container", cls.tor_gateway_container
),
sandbox_curl_image=data.get("sandbox_curl_image", cls.sandbox_curl_image),
resolve_emojis_as_images=bool(
data.get("resolve_emojis_as_images", cls.resolve_emojis_as_images)
),
max_emojis_per_message=int(
data.get("max_emojis_per_message", cls.max_emojis_per_message)
),
web_host=data.get("web_host", cls.web_host),
web_port=int(data.get("web_port", cls.web_port)),
redis_platform_admin_host=data.get("redis_platform_admin_host", cls.redis_platform_admin_host),
redis_platform_admin_port=int(data.get("redis_platform_admin_port", cls.redis_platform_admin_port)),
journal_systemd_units=(
[
str(x).strip()
for x in data.get("journal_systemd_units") or []
if str(x).strip()
]
if isinstance(data.get("journal_systemd_units"), list)
else [] # empty sentinel → resolved_journal_units() derives from the fleet
),
admin_user_ids=admin_user_ids,
shell_authorized_user_ids=shell_authorized_user_ids,
# Admin-ops / cluster control-ops
bot_service_name=data.get("bot_service_name", cls.bot_service_name),
proxy_service_name=data.get("proxy_service_name", cls.proxy_service_name),
bot_repo_path=data.get("bot_repo_path", cls.bot_repo_path),
control_unit_prefix=data.get("control_unit_prefix", cls.control_unit_prefix),
control_unit_names=(
dict(data.get("control_unit_names"))
if isinstance(data.get("control_unit_names"), dict)
else {}
),
control_proxy_handler_service=data.get(
"control_proxy_handler_service", cls.control_proxy_handler_service
),
control_gateway_restart_grace=float(
data.get("control_gateway_restart_grace", cls.control_gateway_restart_grace)
),
control_service_restart_grace=float(
data.get("control_service_restart_grace", cls.control_service_restart_grace)
),
control_reply_timeout=float(
data.get("control_reply_timeout", cls.control_reply_timeout)
),
control_pull_lock_ttl=int(
data.get("control_pull_lock_ttl", cls.control_pull_lock_ttl)
),
command_sync_cooldown_seconds=int(
data.get("command_sync_cooldown_seconds", cls.command_sync_cooldown_seconds)
),
prompt_context_build_timeout_seconds=float(
data.get("prompt_context_build_timeout_seconds", 900.0),
),
preinference_context_shield_timeout_seconds=float(
data.get("preinference_context_shield_timeout_seconds", 600.0),
),
preinference_gather_timeout_seconds=float(
data.get("preinference_gather_timeout_seconds", 100.0),
),
batch_preprocess_shield_timeout_seconds=float(
data.get("batch_preprocess_shield_timeout_seconds", 30.0),
),
media_preprocess_shield_timeout_seconds=float(
data.get("media_preprocess_shield_timeout_seconds", 30.0),
),
redis_stream_maxlen=int(data.get("redis_stream_maxlen", 100_000)),
channel_semantic_recall_enabled=bool(
data.get(
"channel_semantic_recall_enabled",
cls.channel_semantic_recall_enabled,
),
),
channel_semantic_recall_days=int(
data.get(
"channel_semantic_recall_days", cls.channel_semantic_recall_days
),
),
channel_semantic_recall_top_k=int(
data.get(
"channel_semantic_recall_top_k", cls.channel_semantic_recall_top_k
),
),
channel_semantic_recall_oversample_factor=int(
data.get(
"channel_semantic_recall_oversample_factor",
cls.channel_semantic_recall_oversample_factor,
),
),
channel_semantic_recall_neighbor_before=int(
data.get(
"channel_semantic_recall_neighbor_before",
cls.channel_semantic_recall_neighbor_before,
),
),
channel_semantic_recall_neighbor_after=int(
data.get(
"channel_semantic_recall_neighbor_after",
cls.channel_semantic_recall_neighbor_after,
),
),
channel_semantic_recall_max_total_chars=int(
data.get(
"channel_semantic_recall_max_total_chars",
cls.channel_semantic_recall_max_total_chars,
),
),
channel_semantic_recall_max_window_chars=int(
data.get(
"channel_semantic_recall_max_window_chars",
cls.channel_semantic_recall_max_window_chars,
),
),
channel_semantic_recall_min_similarity=float(
data.get(
"channel_semantic_recall_min_similarity",
cls.channel_semantic_recall_min_similarity,
),
),
channel_semantic_recall_cross_channel_enabled=bool(
data.get(
"channel_semantic_recall_cross_channel_enabled",
cls.channel_semantic_recall_cross_channel_enabled,
),
),
channel_semantic_recall_cross_channel_top_k_channels=int(
data.get(
"channel_semantic_recall_cross_channel_top_k_channels",
cls.channel_semantic_recall_cross_channel_top_k_channels,
),
),
channel_semantic_recall_cross_channel_top_k_hits=int(
data.get(
"channel_semantic_recall_cross_channel_top_k_hits",
cls.channel_semantic_recall_cross_channel_top_k_hits,
),
),
channel_semantic_recall_cross_channel_min_similarity=float(
data.get(
"channel_semantic_recall_cross_channel_min_similarity",
cls.channel_semantic_recall_cross_channel_min_similarity,
),
),
channel_semantic_recall_cross_channel_max_total_chars=int(
data.get(
"channel_semantic_recall_cross_channel_max_total_chars",
cls.channel_semantic_recall_cross_channel_max_total_chars,
),
),
channel_semantic_recall_cross_channel_max_window_chars=int(
data.get(
"channel_semantic_recall_cross_channel_max_window_chars",
cls.channel_semantic_recall_cross_channel_max_window_chars,
),
),
channel_semantic_recall_cross_channel_neighbor_before=int(
data.get(
"channel_semantic_recall_cross_channel_neighbor_before",
cls.channel_semantic_recall_cross_channel_neighbor_before,
),
),
channel_semantic_recall_cross_channel_neighbor_after=int(
data.get(
"channel_semantic_recall_cross_channel_neighbor_after",
cls.channel_semantic_recall_cross_channel_neighbor_after,
),
),
channel_semantic_recall_cross_channel_lookback_messages=int(
data.get(
"channel_semantic_recall_cross_channel_lookback_messages",
cls.channel_semantic_recall_cross_channel_lookback_messages,
),
),
channel_semantic_recall_timeout_seconds=float(
data.get(
"channel_semantic_recall_timeout_seconds",
cls.channel_semantic_recall_timeout_seconds,
),
),
webhook_secret=data.get("webhook_secret", cls.webhook_secret),
admin_panel_base_url=admin_panel_base_url,
session_cookie_domain=session_cookie_domain,
discord_oauth_client_id=discord_oauth.get("client_id", ""),
discord_oauth_client_secret=discord_oauth.get("client_secret", ""),
discord_oauth_redirect_uri=_oauth_redirect,
oauth_encryption_key=oauth_cfg.get("encryption_key", ""),
oauth_base_url=oauth_cfg.get("base_url", ""),
oauth_providers=oauth_providers_raw,
platforms=platform_configs,
log_level=str(data.get("log_level", cls.log_level)),
structured_logging=bool(data.get("structured_logging", cls.structured_logging)),
skills_enabled=bool(skills_cfg.get("enabled", False)),
skills_corpus_roots=skills_corpus_roots,
skills_index_db=str(
skills_cfg.get("index_db", cls.skills_index_db),
),
skills_top_k=int(skills_cfg.get("top_k", cls.skills_top_k)),
skills_similarity_threshold=float(
skills_cfg.get(
"similarity_threshold",
cls.skills_similarity_threshold,
),
),
skills_catalog_max_chars=int(
skills_cfg.get(
"catalog_max_chars",
cls.skills_catalog_max_chars,
),
),
mcpo_enabled=bool(mcpo_cfg.get("enabled", cls.mcpo_enabled)),
mcpo_base_url=str(mcpo_cfg.get("base_url", cls.mcpo_base_url)),
mcpo_api_key=str(mcpo_cfg.get("api_key", cls.mcpo_api_key)),
mcpo_config_path=str(
mcpo_cfg.get("config_path", cls.mcpo_config_path),
),
dangerous_command_warning_enabled=bool(
data.get(
"dangerous_command_warning_enabled",
cls.dangerous_command_warning_enabled,
),
),
dangerous_command_guard_fail_mode=str(
data.get(
"dangerous_command_guard_fail_mode",
getattr(cls, "dangerous_command_guard_fail_mode", "open"),
),
),
dangerous_command_similarity_threshold=float(
data.get(
"dangerous_command_similarity_threshold",
cls.dangerous_command_similarity_threshold,
),
),
dangerous_command_benign_margin=float(
data.get(
"dangerous_command_benign_margin",
cls.dangerous_command_benign_margin,
),
),
tool_similarity_threshold=float(
tc_cfg.get("similarity_threshold", cls.tool_similarity_threshold),
),
tool_top_k=int(tc_cfg.get("top_k", cls.tool_top_k)),
tool_strategy_force_threshold=float(
tc_cfg.get(
"strategy_force_threshold", cls.tool_strategy_force_threshold
),
),
tool_strategy_optional_threshold=float(
tc_cfg.get(
"strategy_optional_threshold",
cls.tool_strategy_optional_threshold,
),
),
tool_group_expansion_threshold=float(
tc_cfg.get(
"group_expansion_threshold",
cls.tool_group_expansion_threshold,
),
),
tool_browser_similarity_threshold=float(
tc_cfg.get(
"browser_similarity_threshold",
cls.tool_browser_similarity_threshold,
),
),
overall_user_id_absolute_override_list=overall_user_id_absolute_override_list,
overall_channel_id_absolute_override_list=overall_channel_id_absolute_override_list,
egregores_global_disabled=bool(
data.get("egregores_global_disabled", cls.egregores_global_disabled)
),
loopfield_global_disabled=bool(
data.get("loopfield_global_disabled", cls.loopfield_global_disabled)
),
ego_ablation_global_disabled=bool(
data.get("ego_ablation_global_disabled", cls.ego_ablation_global_disabled)
),
proactive_global_disabled=bool(
data.get("proactive_global_disabled", cls.proactive_global_disabled)
),
ncm_global_disabled=bool(
data.get("ncm_global_disabled", cls.ncm_global_disabled)
),
flash_mirror_global_disabled=bool(
data.get("flash_mirror_global_disabled", cls.flash_mirror_global_disabled)
),
anamnesis_global_disabled=bool(
data.get("anamnesis_global_disabled", cls.anamnesis_global_disabled)
),
cart_lock_global_enabled=bool(
data.get("cart_lock_global_enabled", cls.cart_lock_global_enabled)
),
lore_amplifier_global_disabled=bool(
data.get("lore_amplifier_global_disabled", cls.lore_amplifier_global_disabled)
),
homeserver=homeserver,
user_id=user_id_val,
password=password,
store_path=store_path,
credentials_file=credentials_file,
)
# --- Environment variable overrides ---------------------------
env_map = {
"API_KEY": "api_key",
"OPENROUTER_API_KEY": "api_key",
"GEMINI_API_KEY": "gemini_api_key",
"LLM_BASE_URL": "llm_base_url",
"OPENROUTER_MODEL": "model",
"OPENROUTER_TEMPERATURE": "temperature",
"OPENROUTER_MAX_TOKENS": "max_tokens",
"OPENROUTER_TOP_P": "top_p",
"OPENROUTER_HTTP_READ_TIMEOUT_SECONDS": (
"openrouter_http_read_timeout_seconds"
),
"BOT_SYSTEM_PROMPT_FILE": "system_prompt_file",
"BOT_MAX_HISTORY": "max_history",
"BOT_TOOLS_DIR": "tools_dir",
"SG_TOOLS_SERVICE_MODE": "tools_service_mode",
"REDIS_URL": "redis_url",
"REDIS_SENTINELS": "redis_sentinels",
"REDIS_SENTINEL_MASTER": "redis_sentinel_master",
"REDIS_TLS_CERT": "redis_tls_cert",
"REDIS_TLS_KEY": "redis_tls_key",
"REDIS_TLS_CA": "redis_tls_ca",
"REDIS_TLS_VERIFY_PEER": "redis_tls_verify_peer",
"REDIS_MAX_RETRIES": "redis_max_retries",
"REDIS_HEALTH_CHECK_INTERVAL": "redis_health_check_interval",
"REDIS_SOCKET_KEEPALIVE": "redis_socket_keepalive",
"REDIS_SOCKET_CONNECT_TIMEOUT": "redis_socket_connect_timeout",
"REDIS_SOCKET_TIMEOUT": "redis_socket_timeout",
"EMBEDDING_MODEL": "embedding_model",
"EMBEDDING_BATCH_SIZE": "embedding_batch_size",
"EMBEDDING_FLUSH_INTERVAL": "embedding_flush_interval",
"STARGAZER_VECTOR_PG_DSN": "vector_pg_dsn",
"STARGAZER_VECTOR_PG_HOST": "vector_pg_host",
"STARGAZER_VECTOR_PG_PORT": "vector_pg_port",
"STARGAZER_VECTOR_PG_DATABASE": "vector_pg_database",
"STARGAZER_VECTOR_PG_USER": "vector_pg_user",
"STARGAZER_VECTOR_PG_PASSWORD": "vector_pg_password",
"STARGAZER_VECTOR_PG_SSLMODE": "vector_pg_sslmode",
"STARGAZER_VECTOR_PG_MIN_SIZE": "vector_pg_min_size",
"STARGAZER_VECTOR_PG_MAX_SIZE": "vector_pg_max_size",
"BOT_MEDIA_CACHE_DIR": "media_cache_dir",
"BOT_MEDIA_CACHE_MAX_MB": "media_cache_max_mb",
"BOT_MEDIA_DOWNLOAD_RETRY_ATTEMPTS": "media_download_retry_attempts",
"STARGAZER_USER_SANDBOXES_DIR": "user_sandboxes_dir",
"STARGAZER_USER_SANDBOX_QUOTA_BYTES": "user_sandbox_quota_bytes",
"STARGAZER_USER_SANDBOX_QUOTA_MODE": "user_sandbox_quota_mode",
"STARGAZER_USER_SANDBOX_LOOPBACK_DIR": "user_sandbox_loopback_dir",
"STARGAZER_USER_SANDBOX_LOOPBACK_INDEX_PATH": (
"user_sandbox_loopback_index_path"
),
"STARGAZER_USER_SANDBOX_REMOUNT_ON_STARTUP": (
"user_sandbox_remount_on_startup"
),
"STARGAZER_TOR_GATEWAY_CONTAINER": "tor_gateway_container",
"STARGAZER_SANDBOX_CURL_IMAGE": "sandbox_curl_image",
"RESOLVE_EMOJIS_AS_IMAGES": "resolve_emojis_as_images",
"MAX_EMOJIS_PER_MESSAGE": "max_emojis_per_message",
"BOT_WEB_HOST": "web_host",
"BOT_WEB_PORT": "web_port",
"REDIS_PLATFORM_ADMIN_HOST": "redis_platform_admin_host",
"REDIS_PLATFORM_ADMIN_PORT": "redis_platform_admin_port",
# Admin panel
"ADMIN_PANEL_BASE_URL": "admin_panel_base_url",
"SESSION_COOKIE_DOMAIN": "session_cookie_domain",
# Webhook
"WEBHOOK_SECRET": "webhook_secret",
# Discord OAuth2
"DISCORD_OAUTH_CLIENT_ID": "discord_oauth_client_id",
"DISCORD_OAUTH_CLIENT_SECRET": "discord_oauth_client_secret",
"DISCORD_OAUTH_REDIRECT_URI": "discord_oauth_redirect_uri",
# OAuth2 token management
"OAUTH_ENCRYPTION_KEY": "oauth_encryption_key",
"OAUTH_BASE_URL": "oauth_base_url",
# LLM quality filter
"LLM_FILTER_ENABLED": "llm_filter_enabled",
# RLHF anti-pattern guardrail
"RLHF_GUARDRAIL_ENABLED": "rlhf_guardrail_enabled",
# Proactive responses
"PROACTIVE_ENABLED": "proactive_enabled",
"PROACTIVE_DEFAULT_FREQUENCY": "proactive_default_frequency",
"PROACTIVE_TRIAGE_ENABLED": "proactive_triage_enabled",
"PROACTIVE_TRIAGE_MODEL": "proactive_triage_model",
"STARGAZER_BACKGROUND_SCHEDULER_CHAT_LLM_ENABLED": (
"background_scheduler_chat_llm_enabled"
),
"STARGAZER_BACKGROUND_SCHEDULER_LOG_RAG_INGEST_ENABLED": (
"background_scheduler_log_rag_ingest_enabled"
),
"STARGAZER_LEGACY_KG_EXTRACTION": "legacy_kg_extraction",
# Enhanced Logging configuration env overrides
"STARGAZER_LOG_LEVEL": "log_level",
"STARGAZER_STRUCTURED_LOGGING": "structured_logging",
# Legacy Matrix env vars
"MATRIX_HOMESERVER": "homeserver",
"MATRIX_USER_ID": "user_id",
"MATRIX_PASSWORD": "password",
"MATRIX_STORE_PATH": "store_path",
"MATRIX_CREDENTIALS_FILE": "credentials_file",
"STARGAZER_SKILLS_ENABLED": "skills_enabled",
"MCPO_ENABLED": "mcpo_enabled",
"MCPO_BASE_URL": "mcpo_base_url",
"MCPO_API_KEY": "mcpo_api_key",
"MCPO_CONFIG_PATH": "mcpo_config_path",
"CURSOR_API_KEY": "cursor_api_key",
"DANGEROUS_COMMAND_WARNING_ENABLED": "dangerous_command_warning_enabled",
"DANGEROUS_COMMAND_GUARD_FAIL_MODE": "dangerous_command_guard_fail_mode",
"DANGEROUS_COMMAND_SIMILARITY_THRESHOLD": (
"dangerous_command_similarity_threshold"
),
"DANGEROUS_COMMAND_BENIGN_MARGIN": "dangerous_command_benign_margin",
"TOOL_SIMILARITY_THRESHOLD": "tool_similarity_threshold",
"TOOL_TOP_K": "tool_top_k",
"TOOL_STRATEGY_FORCE_THRESHOLD": "tool_strategy_force_threshold",
"TOOL_STRATEGY_OPTIONAL_THRESHOLD": "tool_strategy_optional_threshold",
"TOOL_GROUP_EXPANSION_THRESHOLD": "tool_group_expansion_threshold",
"TOOL_BROWSER_SIMILARITY_THRESHOLD": "tool_browser_similarity_threshold",
"STARGAZER_STARWIKI_ENABLED": "starwiki_enabled",
"STARGAZER_STARWIKI_ROOT": "starwiki_root",
"STARGAZER_STARWIKI_WORKER_MODEL": "starwiki_worker_model",
"STARGAZER_STARWIKI_LINT_INTERVAL_MINUTES": (
"starwiki_lint_interval_minutes"
),
}
for env_var, attr in env_map.items():
val = os.environ.get(env_var)
if val is not None:
current = getattr(cfg, attr)
if isinstance(current, bool):
setattr(cfg, attr, val.lower() not in ("0", "false", "no", ""))
elif attr == "top_p":
if val.strip():
setattr(cfg, attr, float(val))
else:
setattr(cfg, attr, cls.top_p)
elif isinstance(current, float):
setattr(cfg, attr, float(val))
elif isinstance(current, int):
setattr(cfg, attr, int(val))
else:
setattr(cfg, attr, val)
cfg.llm_base_url = sanitize_llm_http_url(cfg.llm_base_url)
# Discord token from env var
discord_token = os.environ.get("DISCORD_TOKEN")
if discord_token:
# Check if a discord platform already exists
has_discord = any(p.type == "discord" for p in cfg.platforms)
if not has_discord:
cfg.platforms.append(
PlatformConfig(
type="discord",
enabled=True,
settings={"token": discord_token},
)
)
else:
# Update existing discord platform token
for p in cfg.platforms:
if p.type == "discord":
p.settings["token"] = discord_token
# OAuth provider credentials from env vars
for provider in ("github", "google", "discord", "microsoft"):
env_prefix = f"OAUTH_{provider.upper()}"
cid = os.environ.get(f"{env_prefix}_CLIENT_ID", "")
csecret = os.environ.get(f"{env_prefix}_CLIENT_SECRET", "")
if cid or csecret:
if provider not in cfg.oauth_providers:
cfg.oauth_providers[provider] = {}
if cid:
cfg.oauth_providers[provider]["client_id"] = cid
if csecret:
cfg.oauth_providers[provider]["client_secret"] = csecret
roots_s = os.environ.get("STARGAZER_SKILLS_CORPUS_ROOTS", "").strip()
if roots_s:
cfg.skills_corpus_roots = [
p.strip() for p in roots_s.split(",") if p.strip()
]
# Resolve Redis mTLS paths relative to the config file so certificate
# paths like ``tls/client.crt`` work regardless of process CWD (systemd,
# pytest, scripts).
_cfg_dir = config_path.resolve().parent
def _abs_redis_tls_path(p: str) -> str:
"""Resolve a Redis mTLS path relative to the config file's directory.
Closure defined inside :meth:`Config.load` that anchors relative
certificate paths (e.g. ``tls/client.crt``) to ``_cfg_dir`` — the
directory containing the loaded ``config.yaml`` — so they resolve
correctly no matter what the process working directory is (systemd,
pytest, ad-hoc scripts). Empty or whitespace-only values and paths
that are already absolute are returned untouched.
It is called three times within :meth:`Config.load` to normalize
:attr:`Config.redis_tls_cert`, :attr:`Config.redis_tls_key`, and
:attr:`Config.redis_tls_ca`. It performs only path manipulation and
a ``resolve()`` (which may hit the filesystem to canonicalize) and
touches no Redis/KG/LLM/HTTP collaborator. It closes over the
enclosing ``_cfg_dir`` local.
Args:
p (str): A certificate path, possibly empty, relative, or
already absolute.
Returns:
str: *p* unchanged when empty or already absolute; otherwise the
absolute, resolved path under ``_cfg_dir``.
"""
if not p or not str(p).strip():
return p
pp = Path(p)
if pp.is_absolute():
return str(pp)
return str((_cfg_dir / pp).resolve())
cfg.redis_tls_cert = _abs_redis_tls_path(cfg.redis_tls_cert)
cfg.redis_tls_key = _abs_redis_tls_path(cfg.redis_tls_key)
cfg.redis_tls_ca = _abs_redis_tls_path(cfg.redis_tls_ca)
if cfg.mcpo_config_path and not Path(cfg.mcpo_config_path).is_absolute():
cfg.mcpo_config_path = str(
(_cfg_dir / cfg.mcpo_config_path).resolve(),
)
# --- Webhook hygiene (empty secret => /api/webhook disabled at runtime) ---
import ipaddress
import logging as _logging
_cfg_log = _logging.getLogger(__name__)
if not (cfg.webhook_secret or "").strip():
wh = (cfg.web_host or "").strip().lower()
public_bind = False
if wh in ("0.0.0.0", "::", "[::]"):
public_bind = True
else:
try:
ip = ipaddress.ip_address(wh)
public_bind = not (ip.is_loopback or ip.is_link_local)
except ValueError:
public_bind = wh not in ("127.0.0.1", "localhost", "::1")
if public_bind:
_cfg_log.warning(
"webhook_secret is empty — /api/webhook returns 401 until set "
"(web_host=%s)",
cfg.web_host,
)
return cfg
[docs]
def redis_ssl_kwargs(self) -> dict:
"""Return SSL keyword arguments for redis clients.
Returns an empty dict when mTLS is not configured, so callers can
unconditionally unpack the result into ``Redis()`` / ``from_url()``.
Uses client certificate and key from :attr:`redis_tls_cert` /
:attr:`redis_tls_key`. Peer (server) certificate verification is
enabled strictly.
"""
if not (self.redis_tls_cert and self.redis_tls_key):
return {}
return {
"ssl_keyfile": self.redis_tls_key,
"ssl_certfile": self.redis_tls_cert,
"ssl_ca_certs": self.redis_tls_ca,
"ssl_cert_reqs": "required" if self.redis_tls_verify_peer else "none",
"ssl_check_hostname": self.redis_tls_verify_peer,
}
[docs]
def redis_resilience_kwargs(self) -> dict[str, Any]:
"""Connection-resilience kwargs for ``redis.asyncio`` clients.
Returns retry/backoff and health-check options so commands survive a
Redis Sentinel failover window (master re-election) instead of raising
on the first attempt. Safe to unpack into both ``from_url`` and
``Sentinel.master_for``.
A **fresh** :class:`~redis.asyncio.retry.Retry` is created on every
call so independent connection pools do not share backoff state.
Note: ``socket_timeout`` is intentionally only included when
:attr:`redis_socket_timeout` > 0. The primary client is shared with
blocking stream/pubsub consumers (``XREAD``/``SUBSCRIBE``), and a global
read timeout would sever those long-lived blocking calls.
"""
from redis.asyncio.retry import Retry
from redis.backoff import ExponentialBackoff
from redis.exceptions import (
ConnectionError as RedisConnectionError,
TimeoutError as RedisTimeoutError,
)
retries = max(0, int(self.redis_max_retries))
kwargs: dict[str, Any] = {
"retry": Retry(ExponentialBackoff(cap=1.0, base=0.05), retries),
"retry_on_error": [RedisConnectionError, RedisTimeoutError],
"socket_keepalive": bool(self.redis_socket_keepalive),
}
if self.redis_health_check_interval and self.redis_health_check_interval > 0:
kwargs["health_check_interval"] = float(self.redis_health_check_interval)
if self.redis_socket_connect_timeout and self.redis_socket_connect_timeout > 0:
kwargs["socket_connect_timeout"] = float(
self.redis_socket_connect_timeout
)
if self.redis_socket_timeout and self.redis_socket_timeout > 0:
kwargs["socket_timeout"] = float(self.redis_socket_timeout)
return kwargs
[docs]
def build_async_redis_client(self, *, decode_responses: bool = True) -> Any:
"""Build a Sentinel-aware, failover-resilient ``redis.asyncio`` client.
Mirrors :class:`MessageCache` wiring so every subsystem (platform
adapter, pub/sub, etc.) survives a Sentinel failover identically rather
than pinning to a stale master via a bare ``from_url``.
Prefers Sentinel (``master_for``) when :attr:`redis_sentinels` is set,
otherwise falls back to ``from_url`` against :attr:`redis_url`.
"""
import redis.asyncio as aioredis
from redis.asyncio.sentinel import Sentinel
resilience = self.redis_resilience_kwargs()
if self.redis_sentinels:
_ssl = dict(self.redis_ssl_kwargs())
if _ssl:
_ssl["ssl"] = True
sentinels: list[tuple[str, int]] = []
for s in self.redis_sentinels:
parts = s.split(":")
if len(parts) == 2:
sentinels.append((parts[0], int(parts[1])))
else:
sentinels.append((parts[0], 26379))
sentinel = Sentinel(
sentinels,
sentinel_kwargs=_ssl,
**{**_ssl, **resilience},
)
return sentinel.master_for(
self.redis_sentinel_master,
decode_responses=decode_responses,
**resilience,
)
ssl_kw = self.redis_connection_kwargs_for_url(self.redis_url)
return aioredis.from_url(
self.redis_url,
decode_responses=decode_responses,
**{**ssl_kw, **resilience},
)
[docs]
def redis_connection_kwargs_for_url(self, url: str) -> dict[str, Any]:
"""SSL options for ``redis.asyncio.from_url`` only when *url* is ``rediss://``.
Passing TLS kwargs with a ``redis://`` URL selects a non-SSL
:class:`~redis.asyncio.connection.Connection`, which does not accept
``ssl_*`` parameters and raises.
"""
if not url.startswith("rediss://"):
return {}
kwargs = self.redis_ssl_kwargs()
# Fallback permissive SSL for localhost/127.0.0.1/::1 if rediss is strictly requested
from urllib.parse import urlparse
try:
parsed = urlparse(url)
# Normalize hostname (strip brackets for IPv6 loopback [::1])
hostname = (parsed.hostname or "").lower().strip("[] ").strip()
if hostname in ("localhost", "127.0.0.1", "::1"):
if "ssl_cert_reqs" not in kwargs:
kwargs["ssl_cert_reqs"] = "none"
if "ssl_check_hostname" not in kwargs:
kwargs["ssl_check_hostname"] = False
except Exception:
pass
return kwargs
[docs]
def __post_init__(self) -> None:
"""Sync global ego-ablation state and downgrade local ``rediss://`` URLs.
Dataclass post-init hook run automatically after every :class:`Config`
construction (including the one inside :meth:`Config.load`). It imports
the module-level ``ego_ablation`` and copies :attr:`ego_ablation_enabled`
onto its global ``enabled`` flag so the killswitch takes effect process
wide, then, when :attr:`redis_url` targets a loopback host
(``localhost`` / ``127.0.0.1`` / ``::1``) over ``rediss://`` but no client
mTLS cert/key is configured, rewrites the scheme to plain ``redis://`` so
local development does not fail a TLS handshake against a non-TLS server.
Mutates the global ``ego_ablation.enabled`` and possibly
:attr:`redis_url`; parses the URL with ``urllib.parse.urlparse`` and
swallows any exception during that rewrite. Invoked implicitly by the
dataclass machinery, not called directly.
Returns:
None
"""
import ego_ablation
ego_ablation.enabled = self.ego_ablation_enabled
# Automatically downgrade local rediss:// URLs to redis:// ONLY when mTLS is not configured
if self.redis_url and self.redis_url.startswith("rediss://"):
from urllib.parse import urlparse
try:
parsed = urlparse(self.redis_url)
# Normalize hostname (strip brackets for IPv6 loopback [::1])
hostname = (parsed.hostname or "").lower().strip("[] ").strip()
if hostname in ("localhost", "127.0.0.1", "::1"):
if not (self.redis_tls_cert and self.redis_tls_key):
self.redis_url = "redis://" + self.redis_url[9:]
except Exception:
pass