"""Per-user, per-channel LLM inference overrides (!apiurl / !model / !apikey).
Redis hash at ``stargazer:user_llm:{user_id}:{platform}:{channel_id}`` with
optional fields ``api_url``, ``model``, ``api_key`` (encrypted at rest).
"""
from __future__ import annotations
import logging
import re
from typing import Any
from urllib.parse import urlparse, urlunparse
from api_key_encryption import (
decrypt,
encrypt,
get_or_create_user_key,
is_encrypted,
resolve_master_key,
)
logger = logging.getLogger(__name__)
REDIS_KEY_PREFIX = "stargazer:user_llm"
_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"
MAX_API_URL_LEN = 2048
MAX_MODEL_LEN = 512
# When model ids appear in chat-facing error banners (upstream proxy echoes), cap width.
MAX_MODEL_PUBLIC_DISPLAY_LEN = 120
MAX_API_KEY_LEN = 256
_HASH_FIELDS = frozenset({"api_url", "model", "api_key", "toggle_specific"})
[docs]
def redis_key(user_id: str, platform: str, channel_id: str) -> str:
"""Build the Redis hash key holding a user's LLM overrides for one channel.
Joins the module prefix :data:`REDIS_KEY_PREFIX` with the user, platform, and
channel into the single key ``stargazer:user_llm:{user_id}:{platform}:{channel_id}``
under which the ``api_url`` / ``model`` / ``api_key`` / ``toggle_specific`` fields are
stored. Keying per channel is what lets the same user carry different inference
overrides in different rooms.
Called by the get/set/clear helpers in this module (:func:`get_user_llm_config`,
:func:`set_user_llm_field`, :func:`clear_user_llm_field`, and
:func:`clear_all_user_llm_config`) to address the hash, and by
``tests/test_user_llm_config.py`` to assert on stored fields. It only computes a
string and touches no I/O.
Args:
user_id: Identifier of the user owning the overrides.
platform: Source platform name (e.g. ``discord``, ``matrix``).
channel_id: Channel/room identifier scoping the overrides.
Returns:
str: The fully qualified Redis hash key.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}:{platform}:{channel_id}"
_V1_SEGMENT = re.compile(r"/v1(?![0-9])")
def _truncate_junk_after_openai_v1_path(url: str) -> str:
"""Drop Discord-embed / pasted HTML after ``/v1`` when it is not a normal subpath.
Legitimate bases keep ``/v1/chat/...``; junk like ``/v1%3Cembed...`` becomes ``/v1``.
Uses ``/v1(?![0-9])`` so ``/api/v10`` is not treated as ``/v1`` + junk.
"""
parsed = urlparse(url)
path = parsed.path or ""
m = _V1_SEGMENT.search(path)
if not m:
return url
end = m.end()
if end >= len(path):
return url
if path[end] == "/":
return url
new_parsed = parsed._replace(path=path[:end], query="", fragment="")
return urlunparse(new_parsed)
[docs]
def sanitize_llm_http_url(url: str) -> str:
"""Normalize a user-supplied OpenAI-compatible base URL before it is used or stored.
Trims surrounding whitespace, strips embedded CR/LF (which would otherwise enable
header-injection style mischief), and then delegates to
:func:`_truncate_junk_after_openai_v1_path` to discard Discord-embed or pasted-HTML
garbage that follows ``/v1`` when it is not a legitimate subpath. This is the single
hardening choke point for inbound API base URLs.
Called by ``config.py`` when resolving the bot's own ``llm_base_url``, by
:func:`chat_completions_url` in this module, and by ``openrouter_client/transport.py``
when building request URLs (including per-user override chat URLs). Pure string work;
no I/O.
Args:
url: Raw base URL, possibly with whitespace, newlines, or trailing junk.
Returns:
str: The cleaned base URL (empty string if ``url`` was falsy).
"""
s = (url or "").strip().replace("\r", "").replace("\n", "")
return _truncate_junk_after_openai_v1_path(s)
_ALLOWED_LLM_MODEL_ID_DISPLAY_CHARS = frozenset("./:_-")
_PATH_SEGMENT_DENY = frozenset(
{
"img",
"svg",
"body",
"html",
"iframe",
"math",
"video",
"audio",
"canvas",
"picture",
"foreignobject",
}
)
def _looks_like_attack_model_echo(segment: str) -> bool:
"""Heuristically flag a model-id segment that still looks like injected markup.
A second-stage filter run after :func:`sanitize_llm_model_id_display` has already
whitelisted glyphs: even a segment built only from ``A-Za-z0-9./:_-`` can spell out
HTML/JS scaffolding (event-handler attribute names, ``iframe``, a denied first path
segment from :data:`_PATH_SEGMENT_DENY`, leading/trailing or doubled slashes, an
overly deep path, or a stray ``script`` token), so this rejects such cases to stop a
hostile upstream ``model`` value from smuggling formatting into chat-facing output.
Called only within this module by :func:`sanitize_llm_model_id_display` to drop bad
candidate chunks and to veto a final cleaned result. Pure string inspection with no
side effects.
Args:
segment: A candidate model-id fragment, already glyph-whitelisted.
Returns:
bool: ``True`` if the segment resembles markup and must not be echoed.
"""
low = segment.lower()
blocked = ("onerror", "onload", "onclick", "javascript", "iframe")
if any(b in low for b in blocked):
return True
if segment.startswith("/") or segment.endswith("/"):
return True
if "//" in segment:
return True
parts = segment.split("/")
if parts and parts[0].lower() in _PATH_SEGMENT_DENY:
return True
if len(parts) > 6:
return True
if re.search(r"(?is)(?<![a-z0-9])script(?![a-z0-9])", segment):
# Catches stray ``script`` tokens while allowing ``typescript``-style identifiers.
return True
return False
[docs]
def sanitize_llm_model_id_display(
raw: str | None,
*,
max_len: int = MAX_MODEL_PUBLIC_DISPLAY_LEN,
) -> str:
"""Restrict *raw* to characters typical of provider model ids before echoing it in UI text.
User-supplied ``model`` fields (via overrides or hostile upstream bodies) must not emit
raw angle brackets, quotes, Markdown, JSON breaks, control characters, or other payloads
that could alter HTML/Matrix formatting when shown as-assistant-output.
"""
if raw is None:
return "unknown-model"
s = str(raw).strip()
if not s:
return "unknown-model"
scan = s[: max_len * 4]
chunks = [
"".join(
ch
for ch in chunk
if ch.isalnum() or ch in _ALLOWED_LLM_MODEL_ID_DISPLAY_CHARS
)
for chunk in re.split(r"[^A-Za-z0-9./:_\-]+", scan)
if chunk
]
if not chunks:
return "unknown-model"
safe = [c for c in chunks if c and not _looks_like_attack_model_echo(c)]
pool = safe if safe else chunks
best = max(pool, key=len)
stripped = best.strip("./:_-")
cleaned = (stripped or best)[:max_len]
if not cleaned:
return "unknown-model"
if len(cleaned) <= 2 and "/" not in cleaned:
return "unknown-model"
if _looks_like_attack_model_echo(cleaned):
return "unknown-model"
return cleaned
[docs]
def chat_completions_url(base_url: str) -> str:
"""Derive the full chat-completions endpoint from an OpenAI-compatible base URL.
First runs the base through :func:`sanitize_llm_http_url` so any user-pasted junk or
newlines are removed, then appends ``/chat/completions`` (collapsing any trailing
slashes) to yield the concrete POST target. This keeps per-user override URLs and the
bot default on the same sanitized path-building rules.
Called by ``message_processor/generate_and_send.py`` to compute the override chat URL
that gets handed to the inference transport when a user has set a custom ``api_url``.
Pure string work; no I/O.
Args:
base_url: An OpenAI-compatible API base (e.g. ending in ``/v1``).
Returns:
str: The sanitized base with ``/chat/completions`` appended.
"""
base = sanitize_llm_http_url(base_url)
return base.rstrip("/") + "/chat/completions"
def _encryption_db_path(config: Any | None) -> str:
"""Resolve the SQLite path for the per-user API-key encryption database.
Returns ``config.api_key_encryption_db_path`` when the config provides it,
otherwise the module default :data:`_DEFAULT_ENCRYPTION_DB_PATH`. Used by the
get/set helpers in this module to locate the key store when decrypting or
encrypting a user's LLM overrides.
Args:
config: Optional bot config; only ``api_key_encryption_db_path`` is read.
Returns:
str: Absolute path to the encryption-key SQLite database.
"""
if config is not None and hasattr(config, "api_key_encryption_db_path"):
return str(config.api_key_encryption_db_path)
return _DEFAULT_ENCRYPTION_DB_PATH
[docs]
async def get_user_llm_config(
redis,
user_id: str,
platform: str,
channel_id: str,
*,
config: Any | None = None,
) -> dict[str, Any]:
"""Load and decrypt a user's per-channel LLM overrides from Redis.
Reads the hash addressed by :func:`redis_key` via ``HGETALL``, keeps only the
recognized fields in :data:`_HASH_FIELDS`, coerces ``toggle_specific`` to a boolean,
and transparently decrypts a stored ``api_key`` when it is encrypted at rest. For an
encrypted key it resolves the process master key via ``resolve_master_key`` and the
per-user key via ``get_or_create_user_key`` (which reads the SQLite store located by
:func:`_encryption_db_path`), then calls ``decrypt``. This is the read side of the
``!apiurl`` / ``!model`` / ``!apikey`` override feature, letting inference honor a
user's custom endpoint, model, and credentials for one channel.
Failures are swallowed defensively and logged: a Redis error, a missing master key, or
a decrypt error each cause the affected field (or the whole result) to be omitted
rather than raised, so a bad override never breaks message handling.
Called by ``message_processor/generate_and_send.py`` before building an inference
request to fetch any active overrides, and exercised by ``tests/test_proxy_toggle.py``.
Touches Redis (``HGETALL``) and, for an encrypted key, the encryption-key SQLite
database.
Args:
redis: Async Redis client; ``None`` short-circuits to an empty result.
user_id: User whose overrides to load; falsy short-circuits to empty.
platform: Source platform name.
channel_id: Channel/room identifier.
config: Optional bot config, used only to locate the encryption-key database.
Returns:
dict[str, Any]: Present overrides among ``api_url``, ``model``, ``api_key``
(decrypted), and ``toggle_specific`` (bool). Empty if nothing is set, Redis is
unavailable, or a decrypted ``api_key`` could not be recovered.
"""
if redis is None or not user_id:
return {}
key = redis_key(user_id, platform, channel_id)
try:
raw = await redis.hgetall(key)
except Exception:
logger.exception("Redis hgetall failed for %s", key)
return {}
if not raw:
return {}
out: dict[str, str] = {}
sqlite_path = _encryption_db_path(config)
master_key = resolve_master_key()
for field_b, val_b in raw.items():
field = field_b.decode() if isinstance(field_b, bytes) else field_b
val = val_b.decode() if isinstance(val_b, bytes) else val_b
if field not in _HASH_FIELDS:
continue
if field == "api_key":
if not val:
continue
if is_encrypted(val):
if not master_key:
logger.warning(
"Encrypted user LLM api_key but API_KEY_MASTER_KEY unset",
)
continue
try:
user_key = await get_or_create_user_key(
user_id,
sqlite_path,
master_key,
)
out["api_key"] = decrypt(val, user_key)
except Exception:
logger.exception(
"Failed to decrypt user LLM api_key for %s",
user_id,
)
else:
out["api_key"] = val
elif field == "toggle_specific":
out[field] = val == "1"
else:
out[field] = val
return out
[docs]
async def set_user_llm_field(
redis,
user_id: str,
platform: str,
channel_id: str,
field: str,
value: str,
*,
config: Any | None = None,
) -> str | None:
"""Validate and persist one LLM-override field to the user's Redis hash.
The write side of the ``!apiurl`` / ``!model`` / ``!apikey`` commands. It normalizes
the field name and value, rejects anything outside :data:`_HASH_FIELDS`, and applies
per-field rules: ``api_url`` must be ``http``/``https`` and within
:data:`MAX_API_URL_LEN`; ``model`` must be non-empty and within :data:`MAX_MODEL_LEN`;
``toggle_specific`` must be ``1`` or ``0``; and ``api_key`` must satisfy the length
bounds and is encrypted before storage. Encryption requires a configured master key
(``resolve_master_key``) and a per-user key from ``get_or_create_user_key`` (reading
the SQLite store at :func:`_encryption_db_path`), after which the value is sealed with
``encrypt``. On success it writes the field with ``HSET`` under the key from
:func:`redis_key`.
Returning the error as a string rather than raising lets the command handler surface a
friendly chat reply.
Called by ``message_processor/processor.py`` from the ``_apply_field`` helper inside
the user-LLM command dispatch, and exercised by ``tests/test_proxy_toggle.py``. Touches
Redis (``HSET``) and, for ``api_key``, the encryption-key SQLite database.
Args:
redis: Async Redis client; ``None`` yields an error string.
user_id: User whose override is being set.
platform: Source platform name.
channel_id: Channel/room identifier.
field: One of ``api_url``, ``model``, ``api_key``, ``toggle_specific``.
value: Raw value to validate and store (encrypted for ``api_key``).
config: Optional bot config, used only to locate the encryption-key database.
Returns:
str | None: A human-readable error message on rejection or Redis failure, or
``None`` when the field was stored successfully.
"""
if redis is None:
return "Redis is not configured — per-user LLM settings are unavailable."
field = field.strip().lower()
if field not in _HASH_FIELDS:
return f"Invalid field {field!r}."
value = value.strip()
if field == "api_url":
if len(value) > MAX_API_URL_LEN:
return f"API URL is too long (max {MAX_API_URL_LEN} characters)."
low = value.lower()
if not low.startswith(("http://", "https://")):
return "API URL must start with http:// or https://."
elif field == "model":
if not value:
return "Model name cannot be empty."
if len(value) > MAX_MODEL_LEN:
return f"Model name is too long (max {MAX_MODEL_LEN} characters)."
elif field == "api_key":
if len(value) < 8:
return "API key is too short or empty."
if len(value) > MAX_API_KEY_LEN:
return f"API key exceeds maximum length ({MAX_API_KEY_LEN} chars)."
master_key = resolve_master_key()
if master_key is None:
return (
"API key encryption is not configured. Set API_KEY_MASTER_KEY "
"(base64-encoded 32-byte key) in the environment to store API keys."
)
sqlite_path = _encryption_db_path(config)
user_key = await get_or_create_user_key(user_id, sqlite_path, master_key)
value = encrypt(value, user_key)
elif field == "toggle_specific":
if value not in ("1", "0"):
return "Toggle value must be '1' (on) or '0' (off)."
key = redis_key(user_id, platform, channel_id)
try:
await redis.hset(key, field, value)
except Exception:
logger.exception("Redis hset failed for user LLM config %s", key)
return "Failed to save setting to Redis."
return None
[docs]
async def clear_user_llm_field(
redis,
user_id: str,
platform: str,
channel_id: str,
field: str,
) -> str | None:
"""Delete a single LLM-override field from the user's Redis hash.
Reverts one setting back to the bot default without disturbing the user's other
overrides for the channel. Normalizes and validates the field name against
:data:`_HASH_FIELDS`, then removes it with ``HDEL`` on the key from :func:`redis_key`;
deleting an absent field is a harmless no-op. Like its siblings it returns the error as
a string so the command handler can echo it to chat rather than raising.
Called by ``message_processor/processor.py`` from the user-LLM command dispatch,
including the ``!clear...`` reset commands and the cleanup that follows a rejected
``set``. Touches Redis (``HDEL``).
Args:
redis: Async Redis client; ``None`` yields an error string.
user_id: User whose override is being cleared.
platform: Source platform name.
channel_id: Channel/room identifier.
field: One of ``api_url``, ``model``, ``api_key``, ``toggle_specific``.
Returns:
str | None: An error message on an invalid field or Redis failure, else ``None``.
"""
if redis is None:
return "Redis is not configured — per-user LLM settings are unavailable."
field = field.strip().lower()
if field not in _HASH_FIELDS:
return f"Invalid field {field!r}."
key = redis_key(user_id, platform, channel_id)
try:
await redis.hdel(key, field)
except Exception:
logger.exception("Redis hdel failed for user LLM config %s", key)
return "Failed to clear setting in Redis."
return None
[docs]
async def clear_all_user_llm_config(
redis,
user_id: str,
platform: str,
channel_id: str,
) -> str | None:
"""Delete a user's entire LLM-override hash for one channel.
Wipes every stored field at once (``api_url``, ``model``, encrypted ``api_key``, and
``toggle_specific``) by issuing a Redis ``DELETE`` on the key from :func:`redis_key`,
fully resetting the user back to bot defaults for that channel. Unlike
:func:`clear_user_llm_field`, this is all-or-nothing and does not touch the encryption
key store. As with the other helpers it reports problems as a returned string instead
of raising.
A convenience entry point exported for callers wanting a one-shot reset; no in-repo
caller currently invokes it (the command dispatch in
``message_processor/processor.py`` clears fields individually). Touches Redis
(``DELETE``).
Args:
redis: Async Redis client; ``None`` yields an error string.
user_id: User whose overrides are being wiped.
platform: Source platform name.
channel_id: Channel/room identifier.
Returns:
str | None: An error message on Redis failure, or ``None`` on success (including
when the hash did not exist).
"""
if redis is None:
return "Redis is not configured — per-user LLM settings are unavailable."
key = redis_key(user_id, platform, channel_id)
try:
await redis.delete(key)
except Exception:
logger.exception("Redis delete failed for user LLM config %s", key)
return "Failed to clear settings in Redis."
return None