"""Per-user API key management + global/channel API key pools (v4)
Per-user keys follow the user across channels and servers.
Pool keys are shared -- either globally or scoped to a single channel.
Resolution order: user key -> channel pool -> global pool -> env/config
Redis key patterns:
stargazer:user_api_keys:{user_id} (per-user hash)
stargazer:global_api_pool:{service} (global pool hash)
stargazer:channel_api_pool:{channel_id}:{service} (channel pool hash)
API keys are encrypted at rest (AES-256-GCM) with per-user keys stored in SQLite.
"""
from __future__ import annotations
import aiohttp
import json
import logging
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from api_key_encryption import (
ENCRYPTED_PREFIX,
api_key_hash,
decrypt,
encrypt,
get_or_create_user_key,
get_pool_key,
is_encrypted,
resolve_master_key,
)
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"
REDIS_KEY_PREFIX = "stargazer:user_api_keys"
GLOBAL_POOL_PREFIX = "stargazer:global_api_pool"
CHANNEL_POOL_PREFIX = "stargazer:channel_api_pool"
KNOWN_SERVICES = {
"brave": {"display_name": "Brave Search", "signup_url": "https://brave.com/search/api/"},
"elevenlabs": {"display_name": "ElevenLabs", "signup_url": "https://elevenlabs.io/"},
"openrouter": {"display_name": "OpenRouter", "signup_url": "https://openrouter.ai/keys"},
"gemini": {"display_name": "Google Gemini", "signup_url": "https://aistudio.google.com/apikey"},
"civitai": {"display_name": "CivitAI", "signup_url": "https://civitai.com/user/account"},
"vultr": {"display_name": "Vultr", "signup_url": "https://my.vultr.com/settings/#settingsapi"},
"gandi": {"display_name": "Gandi DNS", "signup_url": "https://account.gandi.net/"},
"cloudflare": {"display_name": "Cloudflare", "signup_url": "https://dash.cloudflare.com/profile/api-tokens"},
"aws": {"display_name": "Amazon Web Services", "signup_url": "https://console.aws.amazon.com/iam/"},
"gcp": {"display_name": "Google Cloud", "signup_url": "https://console.cloud.google.com/iam-admin/serviceaccounts"},
"oci": {"display_name": "Oracle Cloud", "signup_url": "https://cloud.oracle.com/"},
"yt_dlp_cookies": {
"display_name": "yt-dlp Cookies",
"signup_url": "https://github.com/yt-dlp/yt-dlp/wiki/FAQ#how-do-i-pass-cookies-to-yt-dlp",
},
"gitea": {
"display_name": "Gitea",
"signup_url": "https://docs.gitea.com/development/api-usage",
},
}
EXTENDED_LIMIT_SERVICES = {"gcp", "oci", "yt_dlp_cookies", "gitea"}
MAX_KEY_LENGTH_DEFAULT = 256
MAX_KEY_LENGTH_EXTENDED = 8192
BRAVE_SEARCH_URL = "https://api.search.brave.com/res/v1/web/search"
def _redis_key(user_id: str) -> str:
"""Internal helper: redis key.
Args:
user_id (str): Unique identifier for the user.
Returns:
str: Result string.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _pool_key(service: str) -> str:
"""Internal helper: pool key.
Args:
service (str): The service value.
Returns:
str: Result string.
"""
return f"{GLOBAL_POOL_PREFIX}:{service}"
def _channel_pool_key(channel_id: str, service: str) -> str:
"""Internal helper: channel pool key.
Args:
channel_id (str): Discord/Matrix channel identifier.
service (str): The service value.
Returns:
str: Result string.
"""
return f"{CHANNEL_POOL_PREFIX}:{channel_id}:{service}"
def _mask_key(api_key: str) -> str:
"""Show first 4 and last 4 characters, mask the rest."""
if len(api_key) <= 10:
return api_key[:2] + "..." + api_key[-2:]
return api_key[:4] + "..." + api_key[-4:]
[docs]
def missing_api_key_error(service: str) -> str:
"""Return a verbose, actionable error message for a missing API key.
Intended to be called by consumer tools when ``get_user_api_key``
returns ``None``. The message includes the service display name,
signup URL (when known), and the exact ``set_user_api_key`` command
so the LLM can relay it to the user.
"""
info = KNOWN_SERVICES.get(service)
display = info["display_name"] if info else service
parts = [
f"{display} API key is not configured.",
"Notify the user that they need to:",
]
if info:
parts.append(f"1. Sign up / get an API key at: {info['signup_url']}")
parts.append(
f"2. Send it to the bot via DM with: set_user_api_key service={service} api_key=YOUR_KEY"
)
else:
parts.append(
f"1. Send their API key via DM with: set_user_api_key service={service} api_key=YOUR_KEY"
)
return " ".join(parts)
# ---------------------------------------------------------------------------
# Per-user daily rate limiting for default (non-user-supplied) API keys
# ---------------------------------------------------------------------------
_DEFAULT_KEY_USAGE_PREFIX = "stargazer:default_key_usage"
_DEFAULT_KEY_TTL = 86400 # 24 hours
[docs]
async def check_default_key_limit(
user_id: str,
tool_name: str,
redis_client,
daily_limit: int = 20,
) -> tuple[bool, int, int]:
"""Check whether *user_id* is within the daily default-key usage limit.
Returns ``(allowed, current_count, daily_limit)``.
"""
if redis_client is None:
return True, 0, daily_limit
key = f"{_DEFAULT_KEY_USAGE_PREFIX}:{user_id}:{tool_name}"
try:
raw = await redis_client.get(key)
current = int(raw) if raw else 0
except Exception:
return True, 0, daily_limit # fail-open
return current < daily_limit, current, daily_limit
[docs]
async def increment_default_key_usage(
user_id: str,
tool_name: str,
redis_client,
) -> None:
"""Increment the daily usage counter after a successful default-key call."""
if redis_client is None:
return
key = f"{_DEFAULT_KEY_USAGE_PREFIX}:{user_id}:{tool_name}"
try:
pipe = redis_client.pipeline()
pipe.incr(key)
pipe.expire(key, _DEFAULT_KEY_TTL)
await pipe.execute()
except Exception as exc:
logger.warning("Failed to increment default key usage for %s/%s: %s", user_id, tool_name, exc)
[docs]
def default_key_limit_error(tool_name: str, current: int, limit: int) -> str:
"""Return an error message when the daily default-key limit is exceeded."""
return (
f"Daily usage limit reached ({current}/{limit}) for {tool_name} using the shared API key. "
f"To continue using this tool, please provide your own API key. "
f"You can set one via DM: set_user_api_key service=gemini api_key=YOUR_KEY "
f"(or the appropriate service name). Your own key has no daily limit."
)
async def _get_redis(ctx):
"""Internal helper: get redis.
Args:
ctx: Tool execution context providing access to bot internals.
"""
r = getattr(ctx, "redis", None)
if r is None:
raise RuntimeError("Redis not available")
return r
def _encryption_db_path(ctx) -> str:
"""Return the SQLite path for encryption keys from config or default."""
config = getattr(ctx, "config", None)
if config and hasattr(config, "api_key_encryption_db_path"):
return config.api_key_encryption_db_path
return _DEFAULT_ENCRYPTION_DB_PATH
async def _validate_brave_key(api_key: str) -> dict:
"""Validate a Brave Search API key with a lightweight test query."""
params = {"q": "test", "count": 1}
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
"X-Subscription-Token": api_key,
}
try:
async with aiohttp.ClientSession() as session:
async with session.get(
BRAVE_SEARCH_URL,
headers=headers,
params=params,
timeout=aiohttp.ClientTimeout(total=15),
) as resp:
if resp.status == 200:
return {"valid": True}
elif resp.status == 401:
return {"valid": False, "error": "Invalid API key (401 Unauthorized)"}
elif resp.status == 402:
return {"valid": False, "error": "API key quota exhausted (402 Payment Required)"}
elif resp.status == 403:
return {"valid": False, "error": "API key forbidden (403) - check permissions"}
elif resp.status == 429:
return {"valid": True, "warning": "Key is valid but currently rate-limited; stored anyway"}
else:
return {"valid": False, "error": f"Unexpected response: HTTP {resp.status}"}
except aiohttp.ClientError as e:
return {"valid": False, "error": f"Network error during validation: {e}"}
# ---------------------------------------------------------------------------
# Gitea credential parsing (token + optional custom base_url)
# ---------------------------------------------------------------------------
GITEA_DEFAULT_BASE = "https://gitea.com"
def _parse_gitea_credential(raw: str) -> tuple[str, str]:
"""Parse Gitea credential: plain token or JSON with token and base_url.
Returns (token, base_url). base_url is normalized (no trailing slash).
"""
raw = raw.strip()
if raw.startswith("{"):
try:
data = json.loads(raw)
token = data.get("token", "").strip()
base = (data.get("base_url") or GITEA_DEFAULT_BASE).strip().rstrip("/")
if not base.startswith(("http://", "https://")):
base = "https://" + base
return token or "", base
except (json.JSONDecodeError, TypeError):
pass
return raw, GITEA_DEFAULT_BASE
[docs]
async def get_gitea_credentials(
user_id: str,
*,
redis_client=None,
channel_id: str | None = None,
fallback_to_pool: bool = True,
config=None,
) -> tuple[str, str] | None:
"""Return (token, base_url) for Gitea, or None.
Resolution order: user key -> channel pool -> global pool.
Parses plain token or JSON {\"token\": \"...\", \"base_url\": \"...\"}.
"""
raw = await get_user_api_key(
user_id,
"gitea",
redis_client=redis_client,
channel_id=channel_id,
fallback_to_pool=fallback_to_pool,
config=config,
)
if not raw:
return None
token, base_url = _parse_gitea_credential(raw)
if not token:
return None
return (token, base_url)
# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------
async def _set_user_api_key(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: set user api key.
Args:
service (str): The service value.
api_key (str): The api key value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
service = service.strip().lower()
api_key = api_key.strip()
if not service or not all(c.isalnum() or c in "_-" for c in service):
return json.dumps({
"success": False,
"error": "Service name must be non-empty alphanumeric (hyphens/underscores allowed).",
})
if not api_key or len(api_key) < 8:
return json.dumps({
"success": False,
"error": "API key is too short or empty.",
})
max_len = MAX_KEY_LENGTH_EXTENDED if service in EXTENDED_LIMIT_SERVICES else MAX_KEY_LENGTH_DEFAULT
if len(api_key) > max_len:
return json.dumps({
"success": False,
"error": f"API key exceeds maximum length ({max_len} chars).",
})
validation = None
if service == "brave":
validation = await _validate_brave_key(api_key)
if not validation["valid"]:
return json.dumps({
"success": False,
"error": f"Key validation failed: {validation['error']}",
"service": service,
})
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to store new API key")
return json.dumps({
"success": False,
"error": (
"API key encryption is not configured. Set API_KEY_MASTER_KEY "
"(base64-encoded 32-byte key) in the environment to store API keys securely."
),
})
r = await _get_redis(ctx)
uid = ctx.user_id
sqlite_path = _encryption_db_path(ctx)
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
encrypted = encrypt(api_key, user_key)
redis_key = _redis_key(uid)
await r.hset(redis_key, service, encrypted)
display_name = KNOWN_SERVICES.get(service, {}).get("display_name", service)
result = {
"success": True,
"message": f"Your {display_name} API key has been saved securely.",
"service": service,
"key_preview": _mask_key(api_key),
}
if validation and validation.get("warning"):
result["warning"] = validation["warning"]
logger.info("User %s stored API key for service '%s'", uid, service)
return json.dumps(result)
async def _remove_user_api_key(
service: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: remove user api key.
Args:
service (str): The service value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
service = service.strip().lower()
r = await _get_redis(ctx)
uid = ctx.user_id
key = _redis_key(uid)
deleted = await r.hdel(key, service)
display_name = KNOWN_SERVICES.get(service, {}).get("display_name", service)
if deleted:
logger.info("User %s removed API key for service '%s'", uid, service)
return json.dumps({
"success": True,
"deleted": True,
"message": f"Your {display_name} API key has been removed.",
})
return json.dumps({
"success": True,
"deleted": False,
"message": f"No API key found for {display_name}.",
})
async def _list_user_api_keys(
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: list user api keys.
Args:
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
uid = ctx.user_id
key = _redis_key(uid)
all_keys = await r.hgetall(key)
if not all_keys:
return json.dumps({
"success": True,
"keys": {},
"count": 0,
"message": "No API keys stored.",
})
master_key = resolve_master_key()
sqlite_path = _encryption_db_path(ctx)
keys_info = {}
for svc, raw_key in all_keys.items():
svc_str = svc if isinstance(svc, str) else svc.decode()
raw_str = raw_key if isinstance(raw_key, str) else raw_key.decode()
if is_encrypted(raw_str) and master_key:
try:
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
plaintext = decrypt(raw_str, user_key)
except Exception as e:
logger.warning("Failed to decrypt API key for %s/%s: %s", uid, svc_str, e)
plaintext = raw_str # fallback to masked raw for preview
else:
plaintext = raw_str
svc_meta = KNOWN_SERVICES.get(svc_str, {})
keys_info[svc_str] = {
"display_name": svc_meta.get("display_name", svc_str),
"key_preview": _mask_key(plaintext),
}
return json.dumps({
"success": True,
"keys": keys_info,
"count": len(keys_info),
})
# ---------------------------------------------------------------
# Pool tool handlers (shared internals + 6 public handlers)
# ---------------------------------------------------------------
def _validate_service_name(service: str) -> str | None:
"""Return an error string if *service* is invalid, else ``None``."""
if not service or not all(c.isalnum() or c in "_-" for c in service):
return "Service name must be non-empty alphanumeric (hyphens/underscores allowed)."
return None
def _validate_api_key_format(api_key: str, service: str = "") -> str | None:
"""Internal helper: validate api key format.
Args:
api_key (str): The api key value.
service (str): The service value.
"""
if not api_key or len(api_key) < 8:
return "API key is too short or empty."
max_len = MAX_KEY_LENGTH_EXTENDED if service in EXTENDED_LIMIT_SERVICES else MAX_KEY_LENGTH_DEFAULT
if len(api_key) > max_len:
return f"API key exceeds maximum length ({max_len} chars)."
return None
async def _donate_to_pool(
redis_key: str,
service: str,
api_key: str,
donor_id: str,
*,
redis_client,
pool_label: str,
) -> str:
"""Shared donate logic for both global and channel pools."""
service = service.strip().lower()
api_key = api_key.strip()
err = _validate_service_name(service)
if err:
return json.dumps({"success": False, "error": err})
err = _validate_api_key_format(api_key, service)
if err:
return json.dumps({"success": False, "error": err})
if service == "brave":
validation = await _validate_brave_key(api_key)
if not validation["valid"]:
return json.dumps({
"success": False,
"error": f"Key validation failed: {validation['error']}",
"service": service,
})
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to donate to pool")
return json.dumps({
"success": False,
"error": (
"API key encryption is not configured. Set API_KEY_MASTER_KEY "
"in the environment to donate keys to the pool."
),
})
field_hash = api_key_hash(api_key)
already = await redis_client.hexists(redis_key, field_hash)
if not already:
# Also check legacy format (raw api_key as field)
already = await redis_client.hexists(redis_key, api_key)
if already:
return json.dumps({
"success": False,
"error": f"This key is already in the {pool_label}.",
})
pool_key = get_pool_key(master_key)
encrypted_blob = encrypt(api_key, pool_key)
metadata = {
"encrypted_key": encrypted_blob,
"donor_id": str(donor_id),
"donated_at": datetime.now(timezone.utc).isoformat(),
}
await redis_client.hset(redis_key, field_hash, json.dumps(metadata))
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
count = await redis_client.hlen(redis_key)
logger.info("User %s donated %s key to %s (now %d)", donor_id, service, pool_label, count)
return json.dumps({
"success": True,
"message": f"Your {display} key has been added to the {pool_label}. Thank you!",
"service": service,
"key_preview": _mask_key(api_key),
"pool_size": count,
})
async def _withdraw_from_pool(
redis_key: str,
service: str,
api_key: str,
caller_id: str,
*,
redis_client,
pool_label: str,
) -> str:
"""Shared withdraw logic for both global and channel pools."""
service = service.strip().lower()
api_key = api_key.strip()
# Try new format (hash as field) first, then legacy (raw api_key as field)
field_hash = api_key_hash(api_key)
raw = await redis_client.hget(redis_key, field_hash)
field_to_delete = field_hash
if raw is None:
raw = await redis_client.hget(redis_key, api_key)
field_to_delete = api_key
if raw is None:
return json.dumps({
"success": False,
"error": f"That key was not found in the {pool_label}.",
})
meta_str = raw if isinstance(raw, str) else raw.decode()
try:
meta = json.loads(meta_str)
except json.JSONDecodeError:
meta = {}
if meta.get("donor_id") != str(caller_id):
return json.dumps({
"success": False,
"error": "Only the original donor can withdraw this key.",
})
await redis_client.hdel(redis_key, field_to_delete)
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
logger.info("User %s withdrew %s key from %s", caller_id, service, pool_label)
return json.dumps({
"success": True,
"message": f"Your {display} key has been removed from the {pool_label}.",
"service": service,
"key_preview": _mask_key(api_key),
})
async def _pool_status(
prefix: str,
suffix_pattern: str,
service: str | None,
*,
redis_client,
pool_label: str,
) -> str:
"""Shared status logic for both global and channel pools."""
if service:
service = service.strip().lower()
rkey = f"{prefix}:{suffix_pattern}".replace("{service}", service)
count = await redis_client.hlen(rkey)
display = KNOWN_SERVICES.get(service, {}).get("display_name", service)
return json.dumps({
"success": True,
"pool": pool_label,
"service": service,
"display_name": display,
"key_count": count,
})
cursor = b"0"
scan_pattern = f"{prefix}:{suffix_pattern}".replace("{service}", "*")
pools: dict[str, int] = {}
while True:
cursor, keys = await redis_client.scan(cursor=cursor, match=scan_pattern, count=200)
for k in keys:
k_str = k if isinstance(k, str) else k.decode()
svc = k_str.rsplit(":", 1)[-1]
pools[svc] = await redis_client.hlen(k_str)
if cursor == 0 or cursor == b"0":
break
return json.dumps({
"success": True,
"pool": pool_label,
"services": {
svc: {
"display_name": KNOWN_SERVICES.get(svc, {}).get("display_name", svc),
"key_count": cnt,
}
for svc, cnt in sorted(pools.items())
},
"total_services": len(pools),
})
# -- Global pool handlers --
async def _donate_api_key_to_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: donate api key to pool.
Args:
service (str): The service value.
api_key (str): The api key value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
service = service.strip().lower()
return await _donate_to_pool(
_pool_key(service), service, api_key, ctx.user_id,
redis_client=r, pool_label="global pool",
)
async def _withdraw_api_key_from_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: withdraw api key from pool.
Args:
service (str): The service value.
api_key (str): The api key value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
service = service.strip().lower()
return await _withdraw_from_pool(
_pool_key(service), service, api_key, ctx.user_id,
redis_client=r, pool_label="global pool",
)
async def _get_pool_status(
service: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: get pool status.
Args:
service (str): The service value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
return await _pool_status(
GLOBAL_POOL_PREFIX, "{service}",
service or None,
redis_client=r, pool_label="global pool",
)
# -- Channel pool handlers --
async def _donate_api_key_to_channel_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: donate api key to channel pool.
Args:
service (str): The service value.
api_key (str): The api key value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
service = service.strip().lower()
return await _donate_to_pool(
_channel_pool_key(ch, service), service, api_key, ctx.user_id,
redis_client=r, pool_label=f"channel pool (channel {ch})",
)
async def _withdraw_api_key_from_channel_pool(
service: str,
api_key: str,
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: withdraw api key from channel pool.
Args:
service (str): The service value.
api_key (str): The api key value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
service = service.strip().lower()
return await _withdraw_from_pool(
_channel_pool_key(ch, service), service, api_key, ctx.user_id,
redis_client=r, pool_label=f"channel pool (channel {ch})",
)
async def _get_channel_pool_status(
service: str = "",
ctx: ToolContext | None = None,
) -> str:
"""Internal helper: get channel pool status.
Args:
service (str): The service value.
ctx (ToolContext | None): Tool execution context providing access to bot internals.
Returns:
str: Result string.
"""
r = await _get_redis(ctx)
ch = ctx.channel_id
if not ch:
return json.dumps({"success": False, "error": "No channel context available."})
return await _pool_status(
CHANNEL_POOL_PREFIX, f"{ch}:{{service}}",
service.strip().lower() or None,
redis_client=r, pool_label=f"channel pool (channel {ch})",
)
# ---------------------------------------------------------------
# Pool helpers (public)
# ---------------------------------------------------------------
async def _get_pool_api_key_impl(
redis_key: str,
*,
redis_client,
) -> str | None:
"""Pick a random key from a pool hash; decrypt if encrypted."""
try:
field = await redis_client.hrandfield(redis_key)
if field is None:
return None
field_str = field if isinstance(field, str) else field.decode()
raw = await redis_client.hget(redis_key, field_str)
if raw is None:
return None
val_str = raw if isinstance(raw, str) else raw.decode()
try:
meta = json.loads(val_str)
except json.JSONDecodeError:
return field_str # legacy: field was the raw api_key
if "encrypted_key" in meta:
master_key = resolve_master_key()
if master_key:
pool_key = get_pool_key(master_key)
return decrypt(meta["encrypted_key"], pool_key)
return field_str # legacy: field was the raw api_key
except Exception:
raise
[docs]
async def get_pool_api_key(
service: str,
*,
redis_client=None,
) -> str | None:
"""Pick a random key from the global pool for *service*, or ``None``."""
if not redis_client:
return None
try:
return await _get_pool_api_key_impl(
_pool_key(service), redis_client=redis_client,
)
except Exception:
logger.exception("Failed to fetch global pool key for %s", service)
return None
[docs]
async def get_channel_pool_api_key(
channel_id: str,
service: str,
*,
redis_client=None,
) -> str | None:
"""Pick a random key from the channel pool for *service*, or ``None``."""
if not redis_client or not channel_id:
return None
try:
return await _get_pool_api_key_impl(
_channel_pool_key(channel_id, service),
redis_client=redis_client,
)
except Exception:
logger.exception("Failed to fetch channel pool key for %s/%s", channel_id, service)
return None
# ---------------------------------------------------------------
# Public helper for other tools (e.g. brave_search)
# ---------------------------------------------------------------
[docs]
async def get_user_api_key(
user_id: str,
service: str,
*,
redis_client=None,
channel_id: str | None = None,
fallback_to_pool: bool = True,
config=None,
) -> str | None:
"""Return the best available API key for *service*.
Resolution order: user key -> channel pool -> global pool.
"""
if not redis_client:
return None
# 1. Per-user key
if user_id:
try:
val = await redis_client.hget(_redis_key(user_id), service)
if val is not None:
raw_str = val if isinstance(val, str) else val.decode()
if is_encrypted(raw_str):
master_key = resolve_master_key()
if not master_key:
logger.warning(
"Encrypted API key found but API_KEY_MASTER_KEY not set"
)
return None
sqlite_path = (
getattr(config, "api_key_encryption_db_path", None)
or _DEFAULT_ENCRYPTION_DB_PATH
)
try:
user_key = await get_or_create_user_key(
user_id, sqlite_path, master_key
)
return decrypt(raw_str, user_key)
except Exception:
logger.exception(
"Failed to decrypt user API key for %s/%s",
user_id, service,
)
return None
return raw_str # legacy plaintext
except Exception:
logger.exception("Failed to fetch user API key for %s/%s", user_id, service)
# 2. Channel pool
if fallback_to_pool:
if channel_id:
ch_key = await get_channel_pool_api_key(
channel_id, service, redis_client=redis_client,
)
if ch_key:
return ch_key
# 3. Global pool
return await get_pool_api_key(service, redis_client=redis_client)
return None
# ---------------------------------------------------------------
# Multi-tool registration
# ---------------------------------------------------------------
TOOLS = [
{
"name": "set_user_api_key",
"description": (
"Store a personal API key for an external service. "
"The key is stored securely per-user and used "
"automatically when the corresponding tool runs. "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel. "
"Known services: brave, elevenlabs, openrouter, "
"gemini, civitai, vultr, gandi, cloudflare, aws, gcp, oci, gitea. "
"Any other service name is also accepted for "
"future or third-party integrations. "
"Signup links -- Brave Search: "
"https://brave.com/search/api/ (free 2k queries/mo), "
"ElevenLabs: https://elevenlabs.io/, "
"OpenRouter: https://openrouter.ai/keys, "
"Google Gemini: https://aistudio.google.com/apikey, "
"CivitAI: https://civitai.com/user/account, "
"Cloudflare: https://dash.cloudflare.com/profile/api-tokens, "
"AWS: store JSON {\"access_key_id\": \"...\", "
"\"secret_access_key\": \"...\", \"region\": \"us-east-1\"}, "
"GCP: store the full service account JSON, "
"OCI: store JSON {\"tenancy\": \"...\", \"user\": \"...\", "
"\"fingerprint\": \"...\", \"key_content\": \"...\", "
"\"region\": \"us-ashburn-1\"}. "
"Gitea: token only (uses gitea.com) or JSON "
"{\"token\": \"...\", \"base_url\": \"https://your-gitea.example.com\"}"
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": (
"The service to store the key for "
"(e.g. 'brave', 'elevenlabs', "
"'openrouter', 'gemini', etc.)."
),
},
"api_key": {
"type": "string",
"description": "The API key to store.",
},
},
"required": ["service", "api_key"],
},
"handler": _set_user_api_key,
},
{
"name": "remove_user_api_key",
"description": (
"Remove a previously stored personal API key "
"for an external service."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": (
"The service whose key should be removed "
"(e.g. 'brave', 'elevenlabs')."
),
},
},
"required": ["service"],
},
"handler": _remove_user_api_key,
},
{
"name": "list_user_api_keys",
"description": (
"List which external services the user has "
"stored API keys for. Shows masked key previews, "
"never full keys."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _list_user_api_keys,
},
# ----- Global pool tools -----
{
"name": "donate_api_key_to_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to donate or contribute a key to the shared/global "
"pool. By default, use set_user_api_key instead. "
"This tool adds an API key to a globally-shared pool that "
"all users across all channels benefit from. The donor can "
"withdraw their key later with withdraw_api_key_from_pool. "
"Keys are validated before being added (for supported services). "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service to donate the key for (e.g. 'brave', 'elevenlabs').",
},
"api_key": {
"type": "string",
"description": "The API key to donate to the global pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _donate_api_key_to_pool,
},
{
"name": "withdraw_api_key_from_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to withdraw or remove their key from the shared/global "
"pool. Only the original donor can withdraw a key. "
"By default, use remove_user_api_key for personal keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service whose key to withdraw (e.g. 'brave').",
},
"api_key": {
"type": "string",
"description": "The exact API key to withdraw from the pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _withdraw_api_key_from_pool,
},
{
"name": "get_pool_status",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks about the shared/global API key pool. "
"Shows how many keys are in the global pool per service. "
"Never reveals actual keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "Optional: a specific service to check. Omit to list all services.",
},
},
},
"handler": _get_pool_status,
},
# ----- Channel pool tools -----
{
"name": "donate_api_key_to_channel_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to donate or contribute a key to this channel's "
"shared pool. By default, use set_user_api_key instead. "
"This tool adds an API key to a pool shared only by users "
"in the current channel. The donor can withdraw their key "
"later with withdraw_api_key_from_channel_pool. "
"Keys are validated before being added (for supported services). "
"IMPORTANT: For privacy, recommend the user send "
"their API key via DM rather than in a public channel."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service to donate the key for (e.g. 'brave', 'elevenlabs').",
},
"api_key": {
"type": "string",
"description": "The API key to donate to the channel pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _donate_api_key_to_channel_pool,
},
{
"name": "withdraw_api_key_from_channel_pool",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks to withdraw or remove their key from this channel's "
"shared pool. Only the original donor can withdraw a key. "
"By default, use remove_user_api_key for personal keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "The service whose key to withdraw (e.g. 'brave').",
},
"api_key": {
"type": "string",
"description": "The exact API key to withdraw from the channel pool.",
},
},
"required": ["service", "api_key"],
},
"handler": _withdraw_api_key_from_channel_pool,
},
{
"name": "get_channel_pool_status",
"description": (
"WARNING: Do NOT use this tool unless the user explicitly "
"asks about this channel's shared API key pool. "
"Shows how many keys are in the current channel's pool "
"per service. Never reveals actual keys."
),
"parameters": {
"type": "object",
"properties": {
"service": {
"type": "string",
"description": "Optional: a specific service to check. Omit to list all services.",
},
},
},
"handler": _get_channel_pool_status,
},
]