"""Per-user API key management + global/channel API key pools (v4)
Per-user keys follow the user across channels and servers.
Pool keys are shared -- either globally or scoped to a single channel.
Resolution order: user key -> channel pool -> global pool -> env/config
Redis key patterns:
stargazer:user_api_keys:{user_id} (per-user hash)
stargazer:global_api_pool:{service} (global pool hash)
stargazer:channel_api_pool:{channel_id}:{service} (channel pool hash)
API keys are encrypted at rest (AES-256-GCM) with per-user keys stored in SQLite.
"""
from __future__ import annotations
import os
from pathlib import Path
import aiohttp
import jsonutil as json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from tools.alter_privileges import has_privilege, PRIVILEGES
from api_key_encryption import (
api_key_hash,
decrypt,
encrypt,
get_or_create_user_key,
get_pool_key,
is_encrypted,
resolve_master_key,
)
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"
REDIS_KEY_PREFIX = "stargazer:user_api_keys"
GLOBAL_POOL_PREFIX = "stargazer:global_api_pool"
CHANNEL_POOL_PREFIX = "stargazer:channel_api_pool"
KNOWN_SERVICES = {
"brave": {
"display_name": "Brave Search",
"signup_url": "https://brave.com/search/api/",
},
"elevenlabs": {
"display_name": "ElevenLabs",
"signup_url": "https://elevenlabs.io/",
},
"openrouter": {
"display_name": "OpenRouter",
"signup_url": "https://openrouter.ai/keys",
},
"pollinations": {
"display_name": "Pollinations",
"signup_url": "https://enter.pollinations.ai",
},
"gemini": {
"display_name": "Google Gemini",
"signup_url": "https://aistudio.google.com/apikey",
},
"civitai": {
"display_name": "CivitAI",
"signup_url": "https://civitai.com/user/account",
},
"vultr": {
"display_name": "Vultr",
"signup_url": "https://my.vultr.com/settings/#settingsapi",
},
"sporestack": {
"display_name": "SporeStack",
"signup_url": "https://sporestack.com/api_documentation/",
},
"gandi": {"display_name": "Gandi DNS", "signup_url": "https://account.gandi.net/"},
"cloudflare": {
"display_name": "Cloudflare",
"signup_url": "https://dash.cloudflare.com/profile/api-tokens",
},
"aws": {
"display_name": "Amazon Web Services",
"signup_url": "https://console.aws.amazon.com/iam/",
},
"gcp": {
"display_name": "Google Cloud",
"signup_url": "https://console.cloud.google.com/iam-admin/serviceaccounts",
},
"oci": {"display_name": "Oracle Cloud", "signup_url": "https://cloud.oracle.com/"},
"yt_dlp_cookies": {
"display_name": "yt-dlp Cookies",
"signup_url": "https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp",
},
"gitea": {
"display_name": "Gitea",
"signup_url": "https://docs.gitea.com/development/api-usage",
},
"wolfram_alpha": {
"display_name": "Wolfram|Alpha",
"signup_url": "https://developer.wolframalpha.com/",
},
"cursor": {
"display_name": "Cursor IDE",
"signup_url": "https://cursor.com/dashboard/cloud-agents",
},
}
EXTENDED_LIMIT_SERVICES = {"gcp", "oci", "yt_dlp_cookies", "gitea"}
MAX_KEY_LENGTH_DEFAULT = 256
MAX_KEY_LENGTH_EXTENDED = 8192
BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
def _redis_key(user_id: str) -> str:
"""Build the Redis hash key holding one user's per-user API keys.
Joins the ``REDIS_KEY_PREFIX`` (``stargazer:user_api_keys``) with the
caller's user id so every per-user lookup, store, and delete in this module
targets a stable, user-scoped hash. Pure string formatting with no I/O.
Called by ``_set_user_api_key``, ``_remove_user_api_key``,
``_list_user_api_keys``, and ``get_user_api_key`` within this module to name
the hash that ``HSET``/``HGET``/``HDEL`` operate on.
Args:
user_id (str): The platform user id whose key hash is being addressed.
Returns:
str: The fully qualified Redis hash key for that user.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _pool_key(service: str) -> str:
"""Build the Redis hash key for a service's global (shared) API key pool.
Joins ``GLOBAL_POOL_PREFIX`` (``stargazer:global_api_pool``) with the
service name so all donate/withdraw/status/random-pick operations on the
globally shared pool for that service hit the same hash. Pure string
formatting with no I/O.
Called by ``_donate_api_key_to_pool``, ``_withdraw_api_key_from_pool``, and
``get_pool_api_key`` within this module to name the global pool hash.
Args:
service (str): The lowercased service identifier (e.g. ``brave``).
Returns:
str: The fully qualified Redis hash key for that service's global pool.
"""
return f"{GLOBAL_POOL_PREFIX}:{service}"
def _channel_pool_key(channel_id: str, service: str) -> str:
"""Build the Redis hash key for a channel-scoped API key pool.
Joins ``CHANNEL_POOL_PREFIX`` (``stargazer:channel_api_pool``) with the
channel id and service name so a pool shared only among users of one
channel stays isolated from the global pool and from other channels. Pure
string formatting with no I/O.
Called by ``_donate_api_key_to_channel_pool``,
``_withdraw_api_key_from_channel_pool``, and ``get_channel_pool_api_key``
within this module to name the per-channel pool hash.
Args:
channel_id (str): The Discord/Matrix channel identifier owning the pool.
service (str): The lowercased service identifier (e.g. ``brave``).
Returns:
str: The fully qualified Redis hash key for that channel pool.
"""
return f"{CHANNEL_POOL_PREFIX}:{channel_id}:{service}"
def _mask_key(api_key: str) -> str:
"""Return a redacted preview of an API key for safe display.
Reveals only the first and last few characters and elides the middle so
previews can be surfaced to the LLM or user without leaking a usable
secret. Short keys (10 chars or fewer) reveal just two characters on each
side. Pure string slicing with no I/O.
Called within this module wherever a ``key_preview`` field is built --
``_set_user_api_key``, ``_list_user_api_keys``, ``_donate_to_pool``, and
``_withdraw_from_pool`` -- and reused by ``tools/donate_embed_key.py``.
Args:
api_key (str): The plaintext API key to mask.
Returns:
str: A masked preview such as ``abcd...wxyz``.
"""
if len(api_key) <= 10:
return api_key[:2] + "..." + api_key[-2:]
return api_key[:4] + "..." + api_key[-4:]
[docs]
def missing_api_key_error(service: str) -> str:
"""Return a verbose, actionable error message for a missing API key.
Intended to be called by consumer tools when ``get_user_api_key``
returns ``None``. The message includes the service display name,
signup URL (when known), and the exact ``set_user_api_key`` command
so the LLM can relay it to the user.
"""
info = KNOWN_SERVICES.get(service)
display = info["display_name"] if info else service
parts = [
f"{display} API key is not configured.",
"Notify the user that they need to:",
]
if info:
parts.append(f"1. Sign up / get an API key at: {info['signup_url']}")
parts.append(
f"2. Send it to the bot via DM with: set_user_api_key service={service} api_key=YOUR_KEY"
)
else:
parts.append(
f"1. Send their API key via DM with: set_user_api_key service={service} api_key=YOUR_KEY"
)
return " ".join(parts)
# ---------------------------------------------------------------------------
# Per-user daily rate limiting for default (non-user-supplied) API keys
# ---------------------------------------------------------------------------
_DEFAULT_KEY_USAGE_PREFIX = "stargazer:default_key_usage"
_DEFAULT_KEY_TTL = 86400 # 24 hours
[docs]
async def check_default_key_limit(
user_id: str,
tool_name: str,
redis_client,
daily_limit: int = 20,
) -> tuple[bool, int, int]:
"""Check whether a user is within the daily shared-key usage limit.
Reads the per-user, per-tool counter at
``stargazer:default_key_usage:{user_id}:{tool_name}`` so tools that fall back
to a shared (non-user-supplied) API key can cap free usage. The check is
fail-open: any missing Redis client, missing counter, or Redis error is
treated as "allowed" so a Redis hiccup never blocks legitimate calls. Reads
Redis only; ``increment_default_key_usage`` performs the matching write.
Called by many sibling tools that consume shared keys -- e.g.
``tools/generate_image.py``, ``tools/brave_search.py``,
``tools/research_tool.py``, ``tools/play_music.py``,
``tools/youtube_describe.py`` -- typically guarded by
``default_key_limit_applies``.
Args:
user_id (str): The calling user's id.
tool_name (str): The tool the counter is scoped to.
redis_client: The Redis client, or ``None`` to bypass the check.
daily_limit (int): The per-day ceiling (default 20).
Returns:
tuple[bool, int, int]: ``(allowed, current_count, daily_limit)``.
"""
if redis_client is None:
return True, 0, daily_limit
key = f"{_DEFAULT_KEY_USAGE_PREFIX}:{user_id}:{tool_name}"
try:
raw = await redis_client.get(key)
current = int(raw) if raw else 0
except Exception:
return True, 0, daily_limit # fail-open
return current < daily_limit, current, daily_limit
[docs]
async def increment_default_key_usage(
user_id: str,
tool_name: str,
redis_client,
) -> None:
"""Bump the daily shared-key usage counter after a successful call.
Records one use of a shared (non-user-supplied) API key by incrementing the
per-user, per-tool counter at
``stargazer:default_key_usage:{user_id}:{tool_name}`` and (re)setting its TTL
to ``_DEFAULT_KEY_TTL`` (24 hours), so the limit window rolls forward and the
counter self-expires. Issues the INCR and EXPIRE in a single Redis pipeline;
a missing client is a no-op and Redis errors are logged but swallowed so
counting never breaks the tool that just succeeded. The companion read is
``check_default_key_limit``.
Called by the same shared-key tools after they complete a call -- e.g.
``tools/generate_image.py``, ``tools/brave_search.py``,
``tools/research_tool.py``, ``tools/play_music.py``,
``tools/youtube_describe.py``.
Args:
user_id (str): The calling user's id.
tool_name (str): The tool the counter is scoped to.
redis_client: The Redis client, or ``None`` to skip incrementing.
Returns:
None
"""
if redis_client is None:
return
key = f"{_DEFAULT_KEY_USAGE_PREFIX}:{user_id}:{tool_name}"
try:
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, _DEFAULT_KEY_TTL)
await pipe.execute()
except Exception as exc:
logger.warning(
"Failed to increment default key usage for %s/%s: %s",
user_id,
tool_name,
exc,
)
[docs]
def default_key_limit_error(tool_name: str, current: int, limit: int) -> str:
"""Build the user-facing message shown when the shared-key limit is hit.
Produces a verbose, actionable string a tool returns to the LLM once
``check_default_key_limit`` reports the user is over quota: it states the
current count and limit and tells the user to set their own (unlimited) key
via ``set_user_api_key``. Pure string formatting with no I/O.
Called by shared-key tools when the daily limit is exceeded -- e.g.
``tools/generate_veo_video.py``, ``tools/brave_search.py``,
``tools/research_tool.py``, ``tools/play_music.py``,
``tools/youtube_describe.py``.
Args:
tool_name (str): The tool the limit applies to.
current (int): The user's current usage count.
limit (int): The daily ceiling that was reached.
Returns:
str: A human-readable explanation and remediation message.
"""
return (
f"Daily usage limit reached for this user ({current}/{limit}) for {tool_name} using the shared API key. "
f"To continue using this tool, please provide your own API key. "
f"You can set one via DM: set_user_api_key service=gemini api_key=YOUR_KEY "
f"(or the appropriate service name). Your own key has no daily limit."
)
[docs]
async def default_key_limit_applies(ctx, using_own_key: bool = False) -> bool:
"""Return True if the shared default-key daily limit should be enforced.
Returns False (i.e. limit is **not** enforced) when any of these hold:
- ``using_own_key`` is True (caller supplied their own API key)
- ``ctx`` is None, or ``ctx.user_id`` / ``ctx.redis`` are missing
- The calling user is a bot admin (``config.admin_user_ids``)
- The calling user has the ``BYPASS_RATELIMIT`` privilege bit set
All other callers are subject to the limit. Safe to call multiple times
per request (one lightweight Redis GET per call).
"""
if using_own_key or ctx is None:
return False
user_id = getattr(ctx, "user_id", None)
redis = getattr(ctx, "redis", None)
if not user_id or not redis:
return False
config = getattr(ctx, "config", None)
admin_ids = getattr(config, "admin_user_ids", None) or []
if str(user_id) in admin_ids:
return False
bypass = await has_privilege(redis, user_id, PRIVILEGES["BYPASS_RATELIMIT"], config)
return not bypass
async def _get_redis(ctx):
"""Return the Redis client from the tool context or raise if absent.
Centralizes the ``ctx.redis`` lookup that every tool handler in this module
needs, failing fast with a clear error when the inference worker handed in a
context without a Redis connection (so handlers never silently no-op against
a missing client). Does no Redis I/O itself; just reads the attribute.
Called by every async handler in this module that touches Redis --
``_set_user_api_key``, ``_remove_user_api_key``, ``_list_user_api_keys``,
and the six pool handlers.
Args:
ctx: The ``ToolContext`` for the current tool invocation.
Returns:
The Redis client bound to ``ctx.redis``.
Raises:
RuntimeError: If ``ctx`` exposes no Redis client.
"""
r = getattr(ctx, "redis", None)
if r is None:
raise RuntimeError("Redis not available")
return r
def _encryption_db_path(ctx) -> str:
"""Resolve the filesystem path to the per-user encryption key database.
Per-user AES key material lives in a local SQLite file; this returns the
configured path (``config.api_key_encryption_db_path``) when present and
otherwise falls back to ``_DEFAULT_ENCRYPTION_DB_PATH``
(``data/api_key_encryption_keys.db``). The returned path is later opened by
``get_or_create_user_key`` from ``api_key_encryption``; this function itself
only reads config attributes and touches no files.
Called by ``_set_user_api_key``, ``_list_user_api_keys``, and
``get_user_api_key`` here, and by several sibling tools
(``tools/manage_secrets.py``, ``tools/sftp_tools.py``,
``tools/totp_tools.py``, ``user_llm_config.py``) that share the same key
store.
Args:
ctx: The ``ToolContext`` (or config-bearing object) to read from.
Returns:
str: The SQLite path for the encryption key database.
"""
config = getattr(ctx, "config", None)
if config and hasattr(config, "api_key_encryption_db_path"):
return config.api_key_encryption_db_path
return _DEFAULT_ENCRYPTION_DB_PATH
async def _validate_brave_key(api_key: str) -> dict:
"""Validate a Brave Search API key by issuing one lightweight live query.
Makes a real outbound HTTPS GET to ``BRAVE_SEARCH_URL`` with the candidate
key in the ``X-Subscription-Token`` header so a bad or exhausted key is
rejected before it is ever stored. Status codes are interpreted: 200 means
valid; 401/402/403 are mapped to distinct failure reasons; 429 is treated as
valid-but-rate-limited (still stored, with a warning); other codes and
network errors are surfaced as failures. Opens and closes its own
``aiohttp.ClientSession`` with a 15-second timeout; no Redis or filesystem
access.
Called by ``_set_user_api_key`` and the shared ``_donate_to_pool`` here,
gating storage of any key whose service is ``brave``.
Args:
api_key (str): The candidate Brave Search API key to test.
Returns:
dict: ``{"valid": bool, ...}`` optionally carrying an ``error`` reason
or a ``warning`` (e.g. when the key is valid but rate-limited).
"""
params = {"q": "test", "count": 1}
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
return {"valid": True}
elif resp.status == 401:
return {
"valid": False,
"error": "Invalid API key (401 Unauthorized)",
}
elif resp.status == 402:
return {
"valid": False,
"error": "API key quota exhausted (402 Payment Required)",
}
elif resp.status == 403:
return {
"valid": False,
"error": "API key forbidden (403) - check permissions",
}
elif resp.status == 429:
return {
"valid": True,
"warning": "Key is valid but currently rate-limited; stored anyway",
}
else:
return {
"valid": False,
"error": f"Unexpected response: HTTP {resp.status}",
}
except aiohttp.ClientError as e:
return {"valid": False, "error": f"Network error during validation: {e}"}
# ---------------------------------------------------------------------------
# Gitea credential parsing (token + optional custom base_url)
# ---------------------------------------------------------------------------
GITEA_DEFAULT_BASE = "https://gitea.com"
GITEA_TOKEN_FILE_DEFAULT = "/home/star/large_files/arche_git_token.txt"
def _gitea_fallback_raw_from_env_or_file() -> str | None:
"""Load a Gitea credential from the environment or a token file.
Provides the bottom of the Gitea resolution chain for deployments that have
no user- or pool-stored key: it prefers the ``GITEA_TOKEN`` environment
variable and otherwise reads the file at ``GITEA_TOKEN_FILE`` (defaulting to
``GITEA_TOKEN_FILE_DEFAULT``). The raw value may be a plain token or a JSON
credential blob; parsing is left to the caller. Reads environment variables
and the filesystem; missing or unreadable files yield ``None`` rather than
raising.
Called by ``get_gitea_credentials`` in this module when no Redis-backed
credential is found.
Returns:
str | None: The raw credential string, or ``None`` if neither the env
var nor the file is available.
"""
token = os.environ.get("GITEA_TOKEN", "").strip()
if token:
return token
path = os.environ.get("GITEA_TOKEN_FILE", GITEA_TOKEN_FILE_DEFAULT).strip()
try:
return Path(path).read_text(encoding="utf-8").strip()
except OSError:
return None
def _gitea_fallback_base_url(raw: str) -> str:
"""Resolve the Gitea API base URL for an env/file-sourced credential.
Applies only to the env/file fallback path (never to Redis-stored keys),
where the base URL is governed by ``GITEA_BASE_URL`` (defaulting to
``https://git.neko.li``) unless the raw credential is itself a JSON blob
carrying its own ``base_url``, which then wins. Normalizes the result to
include a scheme and strips any trailing slash. Reads the ``GITEA_BASE_URL``
environment variable; no other side effects.
Called by ``get_gitea_credentials`` in this module, only when the chosen
credential came from ``_gitea_fallback_raw_from_env_or_file``.
Args:
raw (str): The raw credential string (plain token or JSON blob).
Returns:
str: A normalized base URL with scheme and no trailing slash.
"""
fallback = (
os.environ.get("GITEA_BASE_URL", "https://git.neko.li").strip().rstrip("/")
)
if not fallback.startswith(("http://", "https://")):
fallback = "https://git.neko.li"
stripped = raw.strip()
if stripped.startswith("{"):
try:
data = json.loads(stripped)
base = (data.get("base_url") or "").strip().rstrip("/")
if base:
if not base.startswith(("http://", "https://")):
base = "https://" + base.lstrip("/")
return base
except (json.JSONDecodeError, TypeError, AttributeError):
pass
return fallback
return fallback
def _parse_gitea_credential(raw: str) -> tuple[str, str]:
"""Parse a stored Gitea credential into a token and base URL pair.
Accepts either a plain access token (in which case the public
``GITEA_DEFAULT_BASE`` is assumed) or a JSON blob of the form
``{"token": "...", "base_url": "..."}``; this is what lets a user point the
bot at a self-hosted Gitea instance. The base URL is normalized to include a
scheme and have no trailing slash, and malformed JSON falls back to treating
the whole input as a plain token. Pure parsing with no I/O.
Called by ``get_gitea_credentials`` in this module to interpret a
Redis-stored Gitea credential before it is handed to the Gitea client.
Args:
raw (str): The raw stored credential (plain token or JSON blob).
Returns:
tuple[str, str]: ``(token, base_url)``, with ``base_url`` normalized.
"""
raw = raw.strip()
if raw.startswith("{"):
try:
data = json.loads(raw)
token = data.get("token", "").strip()
base = (data.get("base_url") or GITEA_DEFAULT_BASE).strip().rstrip("/")
if not base.startswith(("http://", "https://")):
base = "https://" + base
return token or "", base
except (json.JSONDecodeError, TypeError):
pass
return raw, GITEA_DEFAULT_BASE
[docs]
async def get_gitea_credentials(
user_id: str,
*,
redis_client=None,
channel_id: str | None = None,
fallback_to_pool: bool = True,
config=None,
) -> tuple[str, str] | None:
"""Return (token, base_url) for Gitea, or None.
Resolution order: user key -> channel pool -> global pool -> GITEA_TOKEN ->
GITEA_TOKEN_FILE (default ``GITEA_TOKEN_FILE_DEFAULT``).
Parses plain token or JSON {\"token\": \"...\", \"base_url\": \"...\"}.
Env/file fallbacks use ``GITEA_BASE_URL`` (default https://git.neko.li) when
the raw value is a plain token; JSON may override with ``base_url``.
"""
raw: str | None = None
if redis_client:
raw = await get_user_api_key(
user_id,
"gitea",
redis_client=redis_client,
channel_id=channel_id,
fallback_to_pool=fallback_to_pool,
config=config,
)
from_fallback = False
if not raw:
fb = _gitea_fallback_raw_from_env_or_file()
if fb:
raw = fb
from_fallback = True
if not raw:
return None
token, base_url = _parse_gitea_credential(raw)
if not token:
return None
if from_fallback:
base_url = _gitea_fallback_base_url(raw)
return (token, base_url)
# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------
async def _set_user_api_key(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Store one user's personal API key for a service, encrypted at rest.
Handler backing the ``set_user_api_key`` tool. It normalizes and validates
the service name and key length, gates the call on the ``STARGAZER_USE``
privilege (via ``has_privilege`` against Redis), and for the ``brave``
service performs a live key check through ``_validate_brave_key`` before
storing anything. The plaintext key is encrypted with a per-user key
(``get_or_create_user_key`` reads/writes the SQLite key store at
``_encryption_db_path``; ``encrypt`` produces an AES-256-GCM blob) and then
HSET into the user's Redis hash at ``_redis_key(uid)``. Logs the store event
and never returns the raw key -- only a masked preview from ``_mask_key``.
Refuses to store if ``API_KEY_MASTER_KEY`` is unset (``resolve_master_key``
returns ``None``).
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service the key is for (e.g. ``brave``, ``gemini``).
api_key (str): The plaintext API key to store.
ctx (ToolContext | None): The tool context carrying ``redis``,
``user_id``, and ``config``.
Returns:
str: A JSON string with ``success`` and, on success, a ``message``,
``service``, and masked ``key_preview`` (plus a ``warning`` when the key
validated but is rate-limited); on failure, an ``error``.
"""
service = service.strip().lower()
api_key = api_key.strip()
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
if not service or not all(c.isalnum() or c in "_-" for c in service):
return json.dumps(
{
"success": False,
"error": "Service name must be non-empty alphanumeric (hyphens/underscores allowed).",
}
)
if not api_key or len(api_key) < 8:
return json.dumps(
{
"success": False,
"error": "API key is too short or empty.",
}
)
max_len = (
MAX_KEY_LENGTH_EXTENDED
if service in EXTENDED_LIMIT_SERVICES
else MAX_KEY_LENGTH_DEFAULT
)
if len(api_key) > max_len:
return json.dumps(
{
"success": False,
"error": f"API key exceeds maximum length ({max_len} chars).",
}
)
validation = None
if service == "brave":
validation = await _validate_brave_key(api_key)
if not validation["valid"]:
return json.dumps(
{
"success": False,
"error": f"Key validation failed: {validation['error']}",
"service": service,
}
)
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to store new API key")
return json.dumps(
{
"success": False,
"error": (
"API key encryption is not configured. Set API_KEY_MASTER_KEY "
"(base64-encoded 32-byte key) in the environment to store API keys securely."
),
}
)
r = await _get_redis(ctx)
uid = ctx.user_id
sqlite_path = _encryption_db_path(ctx)
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
encrypted = encrypt(api_key, user_key)
redis_key = _redis_key(uid)
await r.hset(redis_key, service, encrypted)
display_name = KNOWN_SERVICES.get(service, {}).get("display_name", service)
result = {
"success": True,
"message": f"Your {display_name} API key has been saved securely.",
"service": service,
"key_preview": _mask_key(api_key),
}
if validation and validation.get("warning"):
result["warning"] = validation["warning"]
logger.info("User %s stored API key for service '%s'", uid, service)
return json.dumps(result)
async def _remove_user_api_key(
service: str,
ctx: ToolContext | None = None,
) -> str:
"""Delete one user's personal API key for a service.
Handler backing the ``remove_user_api_key`` tool. After gating on the
``STARGAZER_USE`` privilege (``has_privilege`` against Redis), it HDELs the
service field from the user's per-user hash at ``_redis_key(uid)`` and logs
the removal. Reports distinct outcomes for "a key was actually removed"
versus "no key was stored", so the LLM can phrase the reply accurately.
Does not touch the global or channel pools or the SQLite key store.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service whose stored key should be removed.
ctx (ToolContext | None): The tool context carrying ``redis``,
``user_id``, and ``config``.
Returns:
str: A JSON string with ``success``, a ``deleted`` flag, and a
human-readable ``message`` (or an ``error`` on access denial).
"""
service = service.strip().lower()
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
key = _redis_key(uid)
deleted = await r.hdel(key, service)
display_name = KNOWN_SERVICES.get(service, {}).get("display_name", service)
if deleted:
logger.info("User %s removed API key for service '%s'", uid, service)
return json.dumps(
{
"success": True,
"deleted": True,
"message": f"Your {display_name} API key has been removed.",
}
)
return json.dumps(
{
"success": True,
"deleted": False,
"message": f"No API key found for {display_name}.",
}
)
async def _list_user_api_keys(
ctx: ToolContext | None = None,
) -> str:
"""List the services a user has stored keys for, with masked previews.
Handler backing the ``list_user_api_keys`` tool. After gating on the
``STARGAZER_USE`` privilege, it HGETALLs the user's per-user hash at
``_redis_key(uid)`` and, for each entry, decrypts the stored blob when it is
encrypted (loading the per-user key via ``get_or_create_user_key`` against
the SQLite key store and calling ``decrypt``), falling back to the raw value
for legacy plaintext or when decryption fails. Only a masked preview from
``_mask_key`` and the service display name are returned -- full keys are
never emitted. Reads Redis and the SQLite key store; no network calls.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
ctx (ToolContext | None): The tool context carrying ``redis``,
``user_id``, and ``config``.
Returns:
str: A JSON string with ``success``, a ``keys`` map of service to
display name and masked preview, and a ``count`` (or an ``error`` on
access denial).
"""
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
key = _redis_key(uid)
all_keys = await r.hgetall(key)
if not all_keys:
return json.dumps(
{
"success": True,
"keys": {},
"count": 0,
"message": "No API keys stored.",
}
)
master_key = resolve_master_key()
sqlite_path = _encryption_db_path(ctx)
keys_info = {}
for svc, raw_key in all_keys.items():
svc_str = svc if isinstance(svc, str) else svc.decode()
raw_str = raw_key if isinstance(raw_key, str) else raw_key.decode()
if is_encrypted(raw_str) and master_key:
try:
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
plaintext = decrypt(raw_str, user_key)
except Exception as e:
logger.warning(
"Failed to decrypt API key for %s/%s: %s", uid, svc_str, e
)
plaintext = raw_str # fallback to masked raw for preview
else:
plaintext = raw_str
svc_meta = KNOWN_SERVICES.get(svc_str, {})
keys_info[svc_str] = {
"display_name": svc_meta.get("display_name", svc_str),
"key_preview": _mask_key(plaintext),
}
return json.dumps(
{
"success": True,
"keys": keys_info,
"count": len(keys_info),
}
)
# ---------------------------------------------------------------
# Pool tool handlers (shared internals + 6 public handlers)
# ---------------------------------------------------------------
def _validate_service_name(service: str) -> str | None:
"""Validate a pool service name, returning an error message or ``None``.
Enforces that a service identifier is non-empty and limited to alphanumeric
characters plus hyphen and underscore, mirroring the same check inlined in
``_set_user_api_key`` so pool donations cannot create oddly named or
injection-prone Redis fields. Pure validation with no I/O.
Called by the shared ``_donate_to_pool`` in this module before a key is
added to any global or channel pool.
Args:
service (str): The candidate service identifier.
Returns:
str | None: An error message when invalid, otherwise ``None``.
"""
if not service or not all(c.isalnum() or c in "_-" for c in service):
return (
"Service name must be non-empty alphanumeric (hyphens/underscores allowed)."
)
return None
def _validate_api_key_format(api_key: str, service: str = "") -> str | None:
"""Validate a pool API key's length, returning an error message or ``None``.
Rejects keys shorter than 8 characters and enforces a per-service maximum
length -- ``MAX_KEY_LENGTH_EXTENDED`` for services in
``EXTENDED_LIMIT_SERVICES`` (which may store large JSON credential blobs) and
``MAX_KEY_LENGTH_DEFAULT`` otherwise -- so oversized values cannot bloat the
pool hash. Pure validation with no I/O.
Called by the shared ``_donate_to_pool`` in this module before a key is
added to any global or channel pool.
Args:
api_key (str): The candidate API key.
service (str): The service name, used to pick the length ceiling.
Returns:
str | None: An error message when invalid, otherwise ``None``.
"""
if not api_key or len(api_key) < 8:
return "API key is too short or empty."
max_len = (
MAX_KEY_LENGTH_EXTENDED
if service in EXTENDED_LIMIT_SERVICES
else MAX_KEY_LENGTH_DEFAULT
)
if len(api_key) > max_len:
return f"API key exceeds maximum length ({max_len} chars)."
return None
async def _donate_to_pool(
redis_key: str,
service: str,
api_key: str,
donor_id: str,
*,
redis_client,
pool_label: str,
) -> str:
"""Add an encrypted key to a shared pool hash, recording its donor.
Common implementation behind both the global and channel donate handlers.
It validates the service name and key format
(``_validate_service_name``/``_validate_api_key_format``), runs the live
``brave`` check via ``_validate_brave_key`` when applicable, and refuses to
proceed when ``API_KEY_MASTER_KEY`` is unset (``resolve_master_key`` returns
``None``). Keys are deduplicated by their ``api_key_hash`` (also checking the
legacy raw-key field) so the same key is never stored twice, then the key is
encrypted with the shared pool key (``get_pool_key`` plus ``encrypt``) and
HSET into ``redis_key`` under that hash alongside donor id and an ISO
``donated_at`` timestamp. Logs the donation and reports the new pool size via
HLEN. Reads and writes Redis; for ``brave`` it also makes one outbound HTTP
call.
Called by ``_donate_api_key_to_pool`` and ``_donate_api_key_to_channel_pool``
in this module.
Args:
redis_key (str): The pool hash key (from ``_pool_key`` or
``_channel_pool_key``).
service (str): The service the key is for.
api_key (str): The plaintext key being donated.
donor_id (str): The donating user's id, stored so only they can later
withdraw it.
redis_client: The Redis client to operate on.
pool_label (str): Human-readable pool name used in messages and logs.
Returns:
str: A JSON string with ``success`` and, on success, a ``message``,
``service``, masked ``key_preview``, and ``pool_size``; on failure, an
``error``.
"""
service = service.strip().lower()
api_key = api_key.strip()
err = _validate_service_name(service)
if err:
return json.dumps({"success": False, "error": err})
err = _validate_api_key_format(api_key, service)
if err:
return json.dumps({"success": False, "error": err})
if service == "brave":
validation = await _validate_brave_key(api_key)
if not validation["valid"]:
return json.dumps(
{
"success": False,
"error": f"Key validation failed: {validation['error']}",
"service": service,
}
)
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to donate to pool")
return json.dumps(
{
"success": False,
"error": (
"API key encryption is not configured. Set API_KEY_MASTER_KEY "
"in the environment to donate keys to the pool."
),
}
)
field_hash = api_key_hash(api_key)
already = await redis_client.hexists(redis_key, field_hash)
if not already:
# Also check legacy format (raw api_key as field)
already = await redis_client.hexists(redis_key, api_key)
if already:
return json.dumps(
{
"success": False,
"error": f"This key is already in the {pool_label}.",
}
)
pool_key = get_pool_key(master_key)
encrypted_blob = encrypt(api_key, pool_key)
metadata = {
"encrypted_key": encrypted_blob,
"donor_id": str(donor_id),
"donated_at": datetime.now(timezone.utc).isoformat(),
}
await redis_client.hset(redis_key, field_hash, json.dumps(metadata))
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
count = await redis_client.hlen(redis_key)
logger.info(
"User %s donated %s key to %s (now %d)", donor_id, service, pool_label, count
)
return json.dumps(
{
"success": True,
"message": f"Your {display} key has been added to the {pool_label}. Thank you!",
"service": service,
"key_preview": _mask_key(api_key),
"pool_size": count,
}
)
async def _withdraw_from_pool(
redis_key: str,
service: str,
api_key: str,
caller_id: str,
*,
redis_client,
pool_label: str,
) -> str:
"""Remove a previously donated key from a shared pool, donor-gated.
Common implementation behind both the global and channel withdraw handlers.
It locates the stored entry by ``api_key_hash`` and, for backward
compatibility, by the legacy raw-key field, then parses the stored JSON
metadata and enforces that only the original ``donor_id`` may withdraw it.
On success it HDELs the field and logs the withdrawal. Reads and writes
Redis; no network or filesystem access.
Called by ``_withdraw_api_key_from_pool`` and
``_withdraw_api_key_from_channel_pool`` in this module.
Args:
redis_key (str): The pool hash key (from ``_pool_key`` or
``_channel_pool_key``).
service (str): The service the key is for (used for display only).
api_key (str): The exact plaintext key to withdraw.
caller_id (str): The requesting user's id, checked against the recorded
donor.
redis_client: The Redis client to operate on.
pool_label (str): Human-readable pool name used in messages and logs.
Returns:
str: A JSON string with ``success`` and, on success, a ``message``,
``service``, and masked ``key_preview``; on failure (not found or not
the donor), an ``error``.
"""
service = service.strip().lower()
api_key = api_key.strip()
# Try new format (hash as field) first, then legacy (raw api_key as field)
field_hash = api_key_hash(api_key)
raw = await redis_client.hget(redis_key, field_hash)
field_to_delete = field_hash
if raw is None:
raw = await redis_client.hget(redis_key, api_key)
field_to_delete = api_key
if raw is None:
return json.dumps(
{
"success": False,
"error": f"That key was not found in the {pool_label}.",
}
)
meta_str = raw if isinstance(raw, str) else raw.decode()
try:
meta = json.loads(meta_str)
except json.JSONDecodeError:
meta = {}
if meta.get("donor_id") != str(caller_id):
return json.dumps(
{
"success": False,
"error": "Only the original donor can withdraw this key.",
}
)
await redis_client.hdel(redis_key, field_to_delete)
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
logger.info("User %s withdrew %s key from %s", caller_id, service, pool_label)
return json.dumps(
{
"success": True,
"message": f"Your {display} key has been removed from the {pool_label}.",
"service": service,
"key_preview": _mask_key(api_key),
}
)
async def _pool_status(
prefix: str,
suffix_pattern: str,
service: str | None,
*,
redis_client,
pool_label: str,
) -> str:
"""Report key counts for one service or every service in a pool.
Common implementation behind both the global and channel pool-status
handlers. When a ``service`` is given it HLENs that single pool hash; when
omitted it SCANs Redis for every matching pool hash (substituting ``*`` for
the service placeholder in ``suffix_pattern``) and HLENs each, building a
per-service count map. Only counts and display names are returned -- actual
keys are never read or revealed. Reads Redis only.
Called by ``_get_pool_status`` (global) and ``_get_channel_pool_status``
(channel) in this module.
Args:
prefix (str): The pool key prefix (global or channel).
suffix_pattern (str): The key suffix template containing the literal
``{service}`` placeholder.
service (str | None): A specific service to report, or ``None`` for all.
redis_client: The Redis client to operate on.
pool_label (str): Human-readable pool name echoed back in the result.
Returns:
str: A JSON string with ``success`` and either a single-service
``key_count`` or a ``services`` map plus ``total_services``.
"""
if service:
service = service.strip().lower()
rkey = f"{prefix}:{suffix_pattern}".replace("{service}", service)
count = await redis_client.hlen(rkey)
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
return json.dumps(
{
"success": True,
"pool": pool_label,
"service": service,
"display_name": display,
"key_count": count,
}
)
cursor = b"0"
scan_pattern = f"{prefix}:{suffix_pattern}".replace("{service}", "*")
pools: dict[str, int] = {}
while True:
cursor, keys = await redis_client.scan(
cursor=cursor, match=scan_pattern, count=200
)
for k in keys:
k_str = k if isinstance(k, str) else k.decode()
svc = k_str.rsplit(":", 1)[-1]
pools[svc] = await redis_client.hlen(k_str)
if cursor == 0 or cursor == b"0":
break
return json.dumps(
{
"success": True,
"pool": pool_label,
"services": {
svc: {
"display_name": KNOWN_SERVICES.get(svc, {}).get(
"display_name", svc
),
"key_count": cnt,
}
for svc, cnt in sorted(pools.items())
},
"total_services": len(pools),
}
)
# -- Global pool handlers --
async def _donate_api_key_to_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Donate an API key to the globally shared pool for a service.
Handler backing the ``donate_api_key_to_pool`` tool. It resolves the Redis
client from ``ctx`` and delegates to the shared ``_donate_to_pool`` against
the global pool hash named by ``_pool_key(service)``, recording the caller
as donor so they can later withdraw it. All validation, encryption, dedup,
and the ``brave`` live check happen inside ``_donate_to_pool``.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service the donated key is for.
api_key (str): The plaintext key to add to the global pool.
ctx (ToolContext | None): The tool context carrying ``redis`` and
``user_id``.
Returns:
str: The JSON result string produced by ``_donate_to_pool``.
"""
r = await _get_redis(ctx)
service = service.strip().lower()
return await _donate_to_pool(
_pool_key(service),
service,
api_key,
ctx.user_id,
redis_client=r,
pool_label="global pool",
)
async def _withdraw_api_key_from_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Withdraw the caller's own donated key from the global shared pool.
Handler backing the ``withdraw_api_key_from_pool`` tool. It resolves the
Redis client from ``ctx`` and delegates to the shared ``_withdraw_from_pool``
against the global pool hash named by ``_pool_key(service)``; the donor-only
enforcement lives in ``_withdraw_from_pool``, so a user can remove only a key
they originally donated.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service whose key should be withdrawn.
api_key (str): The exact plaintext key to remove from the global pool.
ctx (ToolContext | None): The tool context carrying ``redis`` and
``user_id``.
Returns:
str: The JSON result string produced by ``_withdraw_from_pool``.
"""
r = await _get_redis(ctx)
service = service.strip().lower()
return await _withdraw_from_pool(
_pool_key(service),
service,
api_key,
ctx.user_id,
redis_client=r,
pool_label="global pool",
)
async def _get_pool_status(
service: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Report how many keys the global pool holds, overall or per service.
Handler backing the ``get_pool_status`` tool. It resolves the Redis client
from ``ctx`` and delegates to the shared ``_pool_status`` over the
``GLOBAL_POOL_PREFIX`` namespace, passing the specific service when given or
``None`` to summarize every service. Returns only counts -- never actual
keys.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): An optional specific service to report; empty means all.
ctx (ToolContext | None): The tool context carrying ``redis``.
Returns:
str: The JSON status string produced by ``_pool_status``.
"""
r = await _get_redis(ctx)
return await _pool_status(
GLOBAL_POOL_PREFIX,
"{service}",
service or None,
redis_client=r,
pool_label="global pool",
)
# -- Channel pool handlers --
async def _donate_api_key_to_channel_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Donate an API key to the current channel's shared pool.
Handler backing the ``donate_api_key_to_channel_pool`` tool. It resolves the
Redis client and the current ``ctx.channel_id`` (erroring if no channel
context is present), then delegates to the shared ``_donate_to_pool`` against
the channel-scoped hash named by ``_channel_pool_key(ch, service)`` so the
key benefits only users of this channel. Validation, encryption, and dedup
happen inside ``_donate_to_pool``.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service the donated key is for.
api_key (str): The plaintext key to add to the channel pool.
ctx (ToolContext | None): The tool context carrying ``redis``,
``user_id``, and ``channel_id``.
Returns:
str: A JSON error when no channel context exists, otherwise the result
string produced by ``_donate_to_pool``.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
service = service.strip().lower()
return await _donate_to_pool(
_channel_pool_key(ch, service),
service,
api_key,
ctx.user_id,
redis_client=r,
pool_label=f"channel pool (channel {ch})",
)
async def _withdraw_api_key_from_channel_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Withdraw the caller's own donated key from the current channel pool.
Handler backing the ``withdraw_api_key_from_channel_pool`` tool. It resolves
the Redis client and the current ``ctx.channel_id`` (erroring if absent),
then delegates to the shared ``_withdraw_from_pool`` against the
channel-scoped hash named by ``_channel_pool_key(ch, service)``; the
donor-only check inside ``_withdraw_from_pool`` ensures a user can remove
only a key they donated to this channel.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): The service whose key should be withdrawn.
api_key (str): The exact plaintext key to remove from the channel pool.
ctx (ToolContext | None): The tool context carrying ``redis``,
``user_id``, and ``channel_id``.
Returns:
str: A JSON error when no channel context exists, otherwise the result
string produced by ``_withdraw_from_pool``.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
service = service.strip().lower()
return await _withdraw_from_pool(
_channel_pool_key(ch, service),
service,
api_key,
ctx.user_id,
redis_client=r,
pool_label=f"channel pool (channel {ch})",
)
async def _get_channel_pool_status(
service: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Report how many keys the current channel's pool holds.
Handler backing the ``get_channel_pool_status`` tool. It resolves the Redis
client and the current ``ctx.channel_id`` (erroring if absent), then
delegates to the shared ``_pool_status`` over the ``CHANNEL_POOL_PREFIX``
namespace scoped to this channel, reporting a single service when given or
summarizing all of the channel's services otherwise. Returns only counts --
never actual keys.
Dispatched by ``tool_loader.py`` via the ``TOOLS`` registry's ``handler``
entry; not called directly elsewhere in the repo.
Args:
service (str): An optional specific service to report; empty means all.
ctx (ToolContext | None): The tool context carrying ``redis`` and
``channel_id``.
Returns:
str: A JSON error when no channel context exists, otherwise the status
string produced by ``_pool_status``.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
return await _pool_status(
CHANNEL_POOL_PREFIX,
f"{ch}:{{service}}",
service.strip().lower() or None,
redis_client=r,
pool_label=f"channel pool (channel {ch})",
)
# ---------------------------------------------------------------
# Pool helpers (public)
# ---------------------------------------------------------------
async def _get_pool_api_key_impl(
redis_key: str,
*,
redis_client,
) -> str | None:
"""Pick a random key from a pool hash, decrypting it when encrypted.
Shared reader behind both global and channel pool lookups. It uses Redis
``HRANDFIELD`` to choose a random entry (load-spreading across donated keys),
HGETs its metadata, and parses the stored JSON: a modern entry carries an
``encrypted_key`` blob that is decrypted with the shared pool key
(``get_pool_key`` plus ``decrypt``) once ``resolve_master_key`` yields the
master key, while a legacy entry stored the raw key as the field name and is
returned directly. Reads Redis; performs no writes. Exceptions propagate to
the caller, which logs and degrades to ``None``.
Called by ``get_pool_api_key`` and ``get_channel_pool_api_key`` in this
module.
Args:
redis_key (str): The pool hash key to draw from.
redis_client: The Redis client to operate on.
Returns:
str | None: A usable plaintext API key, or ``None`` when the pool is
empty or the master key is required but unavailable.
"""
try:
field = await redis_client.hrandfield(redis_key)
if field is None:
return None
field_str = field if isinstance(field, str) else field.decode()
raw = await redis_client.hget(redis_key, field_str)
if raw is None:
return None
val_str = raw if isinstance(raw, str) else raw.decode()
try:
meta = json.loads(val_str)
except json.JSONDecodeError:
return field_str # legacy: field was the raw api_key
if "encrypted_key" in meta:
master_key = resolve_master_key()
if master_key:
pool_key = get_pool_key(master_key)
return decrypt(meta["encrypted_key"], pool_key)
return field_str # legacy: field was the raw api_key
except Exception:
raise
[docs]
async def get_pool_api_key(
service: str,
*,
redis_client=None,
) -> str | None:
"""Return a random shared key from the global pool for a service.
Public helper used during key resolution: it names the global pool hash via
``_pool_key(service)`` and draws a decrypted key through
``_get_pool_api_key_impl``. Returns ``None`` (rather than raising) when no
Redis client is supplied or anything fails, logging the failure so a degraded
pool never breaks the calling tool. Reads Redis only.
Called by ``get_user_api_key`` in this module as the final fallback in the
resolution chain; not invoked directly by other modules.
Args:
service (str): The service to draw a pooled key for.
redis_client: The Redis client to use, or ``None`` to skip the pool.
Returns:
str | None: A pooled plaintext API key, or ``None`` if unavailable.
"""
if not redis_client:
return None
try:
return await _get_pool_api_key_impl(
_pool_key(service),
redis_client=redis_client,
)
except Exception:
logger.exception("Failed to fetch global pool key for %s", service)
return None
[docs]
async def get_channel_pool_api_key(
channel_id: str,
service: str,
*,
redis_client=None,
) -> str | None:
"""Return a random shared key from a channel's pool for a service.
Public helper used during key resolution: it names the channel-scoped pool
hash via ``_channel_pool_key(channel_id, service)`` and draws a decrypted
key through ``_get_pool_api_key_impl``. Returns ``None`` (rather than
raising) when no Redis client or channel id is supplied or anything fails,
logging the failure so a degraded pool never breaks the calling tool. Reads
Redis only.
Called by ``get_user_api_key`` in this module, preferred over the global
pool when a channel id is in scope; not invoked directly by other modules.
Args:
channel_id (str): The channel whose pool to draw from.
service (str): The service to draw a pooled key for.
redis_client: The Redis client to use, or ``None`` to skip the pool.
Returns:
str | None: A pooled plaintext API key, or ``None`` if unavailable.
"""
if not redis_client or not channel_id:
return None
try:
return await _get_pool_api_key_impl(
_channel_pool_key(channel_id, service),
redis_client=redis_client,
)
except Exception:
logger.exception(
"Failed to fetch channel pool key for %s/%s", channel_id, service
)
return None
# ---------------------------------------------------------------
# Public helper for other tools (e.g. brave_search)
# ---------------------------------------------------------------
[docs]
async def get_user_api_key(
user_id: str,
service: str,
*,
redis_client=None,
channel_id: str | None = None,
fallback_to_pool: bool = True,
config=None,
) -> str | None:
"""Return the best available API key for *service*.
Resolution order: user key -> channel pool -> global pool.
"""
if not redis_client:
return None
# 1. Per-user key
if user_id:
try:
val = await redis_client.hget(_redis_key(user_id), service)
if val is not None:
raw_str = val if isinstance(val, str) else val.decode()
if is_encrypted(raw_str):
master_key = resolve_master_key()
if not master_key:
logger.warning(
"Encrypted API key found but API_KEY_MASTER_KEY not set"
)
return None
sqlite_path = (
getattr(config, "api_key_encryption_db_path", None)
or _DEFAULT_ENCRYPTION_DB_PATH
)
try:
user_key = await get_or_create_user_key(
user_id, sqlite_path, master_key
)
return decrypt(raw_str, user_key)
except Exception:
logger.exception(
"Failed to decrypt user API key for %s/%s",
user_id,
service,
)
return None
return raw_str # legacy plaintext
except Exception:
logger.exception("Failed to fetch user API key for %s/%s", user_id, service)
# 2. Channel pool
if fallback_to_pool:
if channel_id:
ch_key = await get_channel_pool_api_key(
channel_id,
service,
redis_client=redis_client,
)
if ch_key:
return ch_key
# 3. Global pool
return await get_pool_api_key(service, redis_client=redis_client)
return None
# ---------------------------------------------------------------
# Multi-tool registration
# ---------------------------------------------------------------
TOOLS = [
{
"name": "set_user_api_key",
"description": (
"Store a personal API key for an external service. "
"The key is stored securely per-user and used "
"automatically when the corresponding tool runs. "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel. "
"Known services: brave, elevenlabs, openrouter, "
"gemini, civitai, vultr, sporestack, gandi, cloudflare, aws, gcp, oci, gitea, cursor. "
"Any other service name is also accepted for "
"future or third-party integrations. "
"Signup links -- Brave Search: "
"https://brave.com/search/api/ (free 2k queries/mo), "
"ElevenLabs: https://elevenlabs.io/, "
"OpenRouter: https://openrouter.ai/keys, "
"Google Gemini: https://aistudio.google.com/apikey, "
"CivitAI: https://civitai.com/user/account, "
"Cloudflare: https://dash.cloudflare.com/profile/api-tokens, "
'AWS: store JSON {"access_key_id": "...", '
'"secret_access_key": "...", "region": "us-east-1"}, '
"GCP: store the full service account JSON, "
'OCI: store JSON {"tenancy": "...", "user": "...", '
'"fingerprint": "...", "key_content": "...", '
'"region": "us-ashburn-1"}. '
"Gitea: token only (uses gitea.com) or JSON "
'{"token": "...", "base_url": "https://your-gitea.example.com"}, '
"Cursor IDE: https://cursor.com/dashboard/cloud-agents (User API Keys, "
"format crsr_...; powers send_cursor_prompt, import_mcp_tool, create_workflow)"
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": (
"The service to store the key for "
"(e.g. 'brave', 'elevenlabs', "
"'openrouter', 'gemini', etc.)."
),
},
"api_key": {
"type": "string",
"description": "The API key to store.",
},
},
"required": ["service", "api_key"],
},
"handler": _set_user_api_key,
},
{
"name": "remove_user_api_key",
"description": (
"Remove a previously stored personal API key " "for an external service."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": (
"The service whose key should be removed "
"(e.g. 'brave', 'elevenlabs')."
),
},
},
"required": ["service"],
},
"handler": _remove_user_api_key,
},
{
"name": "list_user_api_keys",
"description": (
"List which external services the user has "
"stored API keys for. Shows masked key previews, "
"never full keys."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _list_user_api_keys,
},
# ----- Global pool tools -----
{
"name": "donate_api_key_to_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to donate or contribute a key to the shared/global "
"pool. By default, use set_user_api_key instead. "
"This tool adds an API key to a globally-shared pool that "
"all users across all channels benefit from. The donor can "
"withdraw their key later with withdraw_api_key_from_pool. "
"Keys are validated before being added (for supported services). "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service to donate the key for (e.g. 'brave', 'elevenlabs').",
},
"api_key": {
"type": "string",
"description": "The API key to donate to the global pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _donate_api_key_to_pool,
},
{
"name": "withdraw_api_key_from_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to withdraw or remove their key from the shared/global "
"pool. Only the original donor can withdraw a key. "
"By default, use remove_user_api_key for personal keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service whose key to withdraw (e.g. 'brave').",
},
"api_key": {
"type": "string",
"description": "The exact API key to withdraw from the pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _withdraw_api_key_from_pool,
},
{
"name": "get_pool_status",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks about the shared/global API key pool. "
"Shows how many keys are in the global pool per service. "
"Never reveals actual keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "Optional: a specific service to check. Omit to list all services.",
},
},
},
"handler": _get_pool_status,
},
# ----- Channel pool tools -----
{
"name": "donate_api_key_to_channel_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to donate or contribute a key to this channel's "
"shared pool. By default, use set_user_api_key instead. "
"This tool adds an API key to a pool shared only by users "
"in the current channel. The donor can withdraw their key "
"later with withdraw_api_key_from_channel_pool. "
"Keys are validated before being added (for supported services). "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service to donate the key for (e.g. 'brave', 'elevenlabs').",
},
"api_key": {
"type": "string",
"description": "The API key to donate to the channel pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _donate_api_key_to_channel_pool,
},
{
"name": "withdraw_api_key_from_channel_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to withdraw or remove their key from this channel's "
"shared pool. Only the original donor can withdraw a key. "
"By default, use remove_user_api_key for personal keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service whose key to withdraw (e.g. 'brave').",
},
"api_key": {
"type": "string",
"description": "The exact API key to withdraw from the channel pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _withdraw_api_key_from_channel_pool,
},
{
"name": "get_channel_pool_status",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks about this channel's shared API key pool. "
"Shows how many keys are in the current channel's pool "
"per service. Never reveals actual keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "Optional: a specific service to check. Omit to list all services.",
},
},
},
"handler": _get_channel_pool_status,
},
]