"""Per-user secrets management.
Users can store named secrets (passwords, auth tokens, private keys, etc.)
encrypted per-user. Other tools can reference secrets by name using the
secret:name prefix in credential parameters; the registry resolves these
transparently before the handler runs.
Redis key: stargazer:user_secrets:{user_id}
Secrets are encrypted at rest (AES-256-GCM) with per-user keys in SQLite.
"""
from __future__ import annotations
import asyncio
import json
import logging
from typing import TYPE_CHECKING
from api_key_encryption import (
decrypt,
encrypt,
get_or_create_user_key,
is_encrypted,
resolve_master_key,
)
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"
REDIS_KEY_PREFIX = "stargazer:user_secrets"
SECRET_REF_PREFIX = "secret:"
MAX_SECRET_VALUE_LENGTH = 16384
def _redis_key(user_id: str) -> str:
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _validate_secret_name(name: str) -> str | None:
"""Return error string if name is invalid, else None."""
if not name or not name.strip():
return "Secret name must be non-empty."
n = name.strip().lower()
if not all(c.isalnum() or c in "_-" for c in n):
return "Secret name must be alphanumeric (hyphens/underscores allowed)."
return None
def _encryption_db_path(ctx) -> str:
"""Return the SQLite path for encryption keys from config or default."""
config = getattr(ctx, "config", None)
if config and hasattr(config, "api_key_encryption_db_path"):
return config.api_key_encryption_db_path
return _DEFAULT_ENCRYPTION_DB_PATH
async def _get_redis(ctx):
r = getattr(ctx, "redis", None)
if r is None:
raise RuntimeError("Redis not available")
return r
# ---------------------------------------------------------------
# Public helper for other tools
# ---------------------------------------------------------------
[docs]
async def resolve_user_secret(
user_id: str,
secret_name: str,
*,
redis_client=None,
config=None,
) -> str | None:
"""Return the decrypted value for a user's secret, or None if not found."""
if not redis_client or not user_id or not secret_name:
return None
name = secret_name.strip().lower()
if not name:
return None
err = _validate_secret_name(name)
if err:
return None
try:
redis_key = _redis_key(user_id)
raw = await redis_client.hget(redis_key, name)
if raw is None:
return None
raw_str = raw if isinstance(raw, str) else raw.decode()
if not is_encrypted(raw_str):
return raw_str # legacy plaintext (shouldn't happen for new secrets)
master_key = resolve_master_key()
if not master_key:
logger.warning("Encrypted secret found but API_KEY_MASTER_KEY not set")
return None
sqlite_path = (
getattr(config, "api_key_encryption_db_path", None)
or _DEFAULT_ENCRYPTION_DB_PATH
)
user_key = await get_or_create_user_key(user_id, sqlite_path, master_key)
return await asyncio.to_thread(decrypt, raw_str, user_key)
except Exception:
logger.exception("Failed to resolve secret %s for user %s", name, user_id)
return None
# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------
async def _store_secret(
name: str,
value: str,
ctx: ToolContext | None = None,
) -> str:
r = await _get_redis(ctx)
uid = ctx.user_id
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
if not value or not value.strip():
return json.dumps({"success": False, "error": "Secret value must be non-empty."})
if len(value) > MAX_SECRET_VALUE_LENGTH:
return json.dumps({
"success": False,
"error": f"Secret value exceeds maximum length ({MAX_SECRET_VALUE_LENGTH} chars).",
})
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to store secret")
return json.dumps({
"success": False,
"error": (
"Secrets encryption is not configured. Set API_KEY_MASTER_KEY "
"(base64-encoded 32-byte key) in the environment."
),
})
name_lower = name.strip().lower()
sqlite_path = _encryption_db_path(ctx)
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
encrypted = encrypt(value.strip(), user_key)
redis_key = _redis_key(uid)
await r.hset(redis_key, name_lower, encrypted)
logger.info("User %s stored secret '%s'", uid, name_lower)
return json.dumps({
"success": True,
"message": f"Secret '{name_lower}' has been stored securely.",
"name": name_lower,
})
async def _delete_secret(
name: str,
ctx: ToolContext | None = None,
) -> str:
r = await _get_redis(ctx)
uid = ctx.user_id
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
name_lower = name.strip().lower()
redis_key = _redis_key(uid)
deleted = await r.hdel(redis_key, name_lower)
if deleted:
logger.info("User %s deleted secret '%s'", uid, name_lower)
return json.dumps({
"success": True,
"deleted": True,
"message": f"Secret '{name_lower}' has been removed.",
})
return json.dumps({
"success": True,
"deleted": False,
"message": f"No secret named '{name_lower}' found.",
})
async def _list_secrets(
ctx: ToolContext | None = None,
) -> str:
r = await _get_redis(ctx)
uid = ctx.user_id
redis_key = _redis_key(uid)
all_secrets = await r.hgetall(redis_key)
if not all_secrets:
return json.dumps({
"success": True,
"names": [],
"count": 0,
"message": "No secrets stored.",
})
names = sorted(
s if isinstance(s, str) else s.decode()
for s in all_secrets.keys()
)
return json.dumps({
"success": True,
"names": names,
"count": len(names),
})
async def _get_secret(
name: str,
ctx: ToolContext | None = None,
) -> str:
"""Retrieve a secret value. WARNING: Never display the returned value to users in public channels."""
if ctx is None or ctx.redis is None or not ctx.user_id:
return json.dumps({"success": False, "error": "Context not available."})
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
name_lower = name.strip().lower()
value = await resolve_user_secret(
ctx.user_id,
name_lower,
redis_client=ctx.redis,
config=getattr(ctx, "config", None),
)
if value is None:
return json.dumps({
"success": False,
"error": f"Secret '{name_lower}' not found.",
})
return json.dumps({
"success": True,
"name": name_lower,
"value": value,
})
# ---------------------------------------------------------------
# Multi-tool registration
# ---------------------------------------------------------------
TOOLS = [
{
"name": "store_secret",
"description": (
"Store a named secret (password, auth token, private key, etc.) "
"encrypted per-user. The secret can be referenced in other tools "
"by using secret:name in credential fields (e.g. password=secret:my_db_pass). "
"IMPORTANT: For privacy, recommend the user send secrets via DM "
"rather than in a public channel. Secret names: alphanumeric, hyphens, underscores."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "A name for the secret (e.g. 'my_db_password', 'github_token').",
},
"value": {
"type": "string",
"description": "The secret value to store (password, token, key, etc.).",
},
},
"required": ["name", "value"],
},
"handler": _store_secret,
},
{
"name": "delete_secret",
"description": (
"Delete a previously stored secret by name. "
"Only removes the secret; does not affect tools that may have used it."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the secret to delete.",
},
},
"required": ["name"],
},
"handler": _delete_secret,
},
{
"name": "list_secrets",
"description": (
"List the names of secrets the user has stored. "
"Never shows secret values, only names."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _list_secrets,
},
{
"name": "get_secret",
"description": (
"Retrieve the decrypted value of a stored secret by name. "
"Use when you need the value for a tool that does not support secret:name. "
"WARNING: Never display or repeat the returned value to users in public channels. "
"Prefer using secret:name in tool parameters when the tool supports it."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the secret to retrieve.",
},
},
"required": ["name"],
},
"handler": _get_secret,
},
]