Source code for tools.manage_secrets

"""Per-user secrets management.

Users can store named secrets (passwords, auth tokens, private keys, etc.)
encrypted per-user. Other tools can reference secrets by name using the
secret:name prefix in credential parameters; the registry resolves these
transparently before the handler runs.

Redis key: stargazer:user_secrets:{user_id}
Secrets are encrypted at rest (AES-256-GCM) with per-user keys in SQLite.
"""

from __future__ import annotations

import asyncio
import json
import logging
from typing import TYPE_CHECKING

from api_key_encryption import (
    decrypt,
    encrypt,
    get_or_create_user_key,
    is_encrypted,
    resolve_master_key,
)

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"

REDIS_KEY_PREFIX = "stargazer:user_secrets"
SECRET_REF_PREFIX = "secret:"
MAX_SECRET_VALUE_LENGTH = 16384


def _redis_key(user_id: str) -> str:
    return f"{REDIS_KEY_PREFIX}:{user_id}"


def _validate_secret_name(name: str) -> str | None:
    """Return error string if name is invalid, else None."""
    if not name or not name.strip():
        return "Secret name must be non-empty."
    n = name.strip().lower()
    if not all(c.isalnum() or c in "_-" for c in n):
        return "Secret name must be alphanumeric (hyphens/underscores allowed)."
    return None


def _encryption_db_path(ctx) -> str:
    """Return the SQLite path for encryption keys from config or default."""
    config = getattr(ctx, "config", None)
    if config and hasattr(config, "api_key_encryption_db_path"):
        return config.api_key_encryption_db_path
    return _DEFAULT_ENCRYPTION_DB_PATH


async def _get_redis(ctx):
    r = getattr(ctx, "redis", None)
    if r is None:
        raise RuntimeError("Redis not available")
    return r


# ---------------------------------------------------------------
# Public helper for other tools
# ---------------------------------------------------------------


[docs] async def resolve_user_secret( user_id: str, secret_name: str, *, redis_client=None, config=None, ) -> str | None: """Return the decrypted value for a user's secret, or None if not found.""" if not redis_client or not user_id or not secret_name: return None name = secret_name.strip().lower() if not name: return None err = _validate_secret_name(name) if err: return None try: redis_key = _redis_key(user_id) raw = await redis_client.hget(redis_key, name) if raw is None: return None raw_str = raw if isinstance(raw, str) else raw.decode() if not is_encrypted(raw_str): return raw_str # legacy plaintext (shouldn't happen for new secrets) master_key = resolve_master_key() if not master_key: logger.warning("Encrypted secret found but API_KEY_MASTER_KEY not set") return None sqlite_path = ( getattr(config, "api_key_encryption_db_path", None) or _DEFAULT_ENCRYPTION_DB_PATH ) user_key = await get_or_create_user_key(user_id, sqlite_path, master_key) return await asyncio.to_thread(decrypt, raw_str, user_key) except Exception: logger.exception("Failed to resolve secret %s for user %s", name, user_id) return None
# --------------------------------------------------------------- # Tool handlers # --------------------------------------------------------------- async def _store_secret( name: str, value: str, ctx: ToolContext | None = None, ) -> str: r = await _get_redis(ctx) uid = ctx.user_id err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) if not value or not value.strip(): return json.dumps({"success": False, "error": "Secret value must be non-empty."}) if len(value) > MAX_SECRET_VALUE_LENGTH: return json.dumps({ "success": False, "error": f"Secret value exceeds maximum length ({MAX_SECRET_VALUE_LENGTH} chars).", }) master_key = resolve_master_key() if master_key is None: logger.warning("API_KEY_MASTER_KEY not set; refusing to store secret") return json.dumps({ "success": False, "error": ( "Secrets encryption is not configured. Set API_KEY_MASTER_KEY " "(base64-encoded 32-byte key) in the environment." ), }) name_lower = name.strip().lower() sqlite_path = _encryption_db_path(ctx) user_key = await get_or_create_user_key(uid, sqlite_path, master_key) encrypted = encrypt(value.strip(), user_key) redis_key = _redis_key(uid) await r.hset(redis_key, name_lower, encrypted) logger.info("User %s stored secret '%s'", uid, name_lower) return json.dumps({ "success": True, "message": f"Secret '{name_lower}' has been stored securely.", "name": name_lower, }) async def _delete_secret( name: str, ctx: ToolContext | None = None, ) -> str: r = await _get_redis(ctx) uid = ctx.user_id err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) name_lower = name.strip().lower() redis_key = _redis_key(uid) deleted = await r.hdel(redis_key, name_lower) if deleted: logger.info("User %s deleted secret '%s'", uid, name_lower) return json.dumps({ "success": True, "deleted": True, "message": f"Secret '{name_lower}' has been removed.", }) return json.dumps({ "success": True, "deleted": False, "message": f"No secret named '{name_lower}' found.", }) async def _list_secrets( ctx: ToolContext | None = None, ) -> str: r = await _get_redis(ctx) uid = ctx.user_id redis_key = _redis_key(uid) all_secrets = await r.hgetall(redis_key) if not all_secrets: return json.dumps({ "success": True, "names": [], "count": 0, "message": "No secrets stored.", }) names = sorted( s if isinstance(s, str) else s.decode() for s in all_secrets.keys() ) return json.dumps({ "success": True, "names": names, "count": len(names), }) async def _get_secret( name: str, ctx: ToolContext | None = None, ) -> str: """Retrieve a secret value. WARNING: Never display the returned value to users in public channels.""" if ctx is None or ctx.redis is None or not ctx.user_id: return json.dumps({"success": False, "error": "Context not available."}) err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) name_lower = name.strip().lower() value = await resolve_user_secret( ctx.user_id, name_lower, redis_client=ctx.redis, config=getattr(ctx, "config", None), ) if value is None: return json.dumps({ "success": False, "error": f"Secret '{name_lower}' not found.", }) return json.dumps({ "success": True, "name": name_lower, "value": value, }) # --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "store_secret", "description": ( "Store a named secret (password, auth token, private key, etc.) " "encrypted per-user. The secret can be referenced in other tools " "by using secret:name in credential fields (e.g. password=secret:my_db_pass). " "IMPORTANT: For privacy, recommend the user send secrets via DM " "rather than in a public channel. Secret names: alphanumeric, hyphens, underscores." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "A name for the secret (e.g. 'my_db_password', 'github_token').", }, "value": { "type": "string", "description": "The secret value to store (password, token, key, etc.).", }, }, "required": ["name", "value"], }, "handler": _store_secret, }, { "name": "delete_secret", "description": ( "Delete a previously stored secret by name. " "Only removes the secret; does not affect tools that may have used it." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "The name of the secret to delete.", }, }, "required": ["name"], }, "handler": _delete_secret, }, { "name": "list_secrets", "description": ( "List the names of secrets the user has stored. " "Never shows secret values, only names." ), "parameters": { "type": "object", "properties": {}, }, "handler": _list_secrets, }, { "name": "get_secret", "description": ( "Retrieve the decrypted value of a stored secret by name. " "Use when you need the value for a tool that does not support secret:name. " "WARNING: Never display or repeat the returned value to users in public channels. " "Prefer using secret:name in tool parameters when the tool supports it." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "The name of the secret to retrieve.", }, }, "required": ["name"], }, "handler": _get_secret, }, ]