Source code for tools.manage_secrets

"""Per-user secrets management.

Users can store named secrets (passwords, auth tokens, private keys, etc.)
encrypted per-user. Other tools can reference secrets by name using the
secret:name prefix in credential parameters; the registry resolves these
transparently before the handler runs.

Redis key: stargazer:user_secrets:{user_id}
Secrets are encrypted at rest (AES-256-GCM) with per-user keys in SQLite.
"""

from __future__ import annotations

import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING

from tools.alter_privileges import has_privilege, PRIVILEGES
from api_key_encryption import (
    decrypt,
    encrypt,
    get_or_create_user_key,
    is_encrypted,
    resolve_master_key,
)

if TYPE_CHECKING:
    from tool_context import ToolContext

logger = logging.getLogger(__name__)

_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"

REDIS_KEY_PREFIX = "stargazer:user_secrets"
SECRET_REF_PREFIX = "secret:"
MAX_SECRET_VALUE_LENGTH = 16384


def _redis_key(user_id: str) -> str:
    """Build the Redis hash key that holds one user's secrets.

    Joins the module-level ``REDIS_KEY_PREFIX`` with ``user_id`` to produce
    the per-user namespace ``stargazer:user_secrets:{user_id}``. The returned
    key names a Redis hash whose fields are lowercased secret names mapping to
    encrypted values. This is a pure string builder with no side effects.

    Called by every secret operation in this module that touches Redis:
    :func:`resolve_user_secret`, :func:`_store_secret`, :func:`_delete_secret`,
    and :func:`_list_secrets`. (The same helper name exists independently in
    several other ``tools/`` modules; this one is module-local.)

    Args:
        user_id: The platform user identifier whose secret namespace is wanted.

    Returns:
        str: The Redis hash key ``stargazer:user_secrets:{user_id}``.
    """
    return f"{REDIS_KEY_PREFIX}:{user_id}"


def _validate_secret_name(name: str) -> str | None:
    """Validate a secret name and return a human-readable error, or None if valid.

    Enforces the naming contract for secret fields so they are safe to use as
    Redis hash field names and as ``secret:name`` references: the name must be
    non-empty and contain only alphanumerics, hyphens, and underscores. This is
    a pure check with no I/O; it lowercases and strips a copy of the name only
    to test the character set, and does not mutate the caller's value.

    Called by every name-taking operation in this module before touching Redis:
    :func:`resolve_user_secret`, :func:`_store_secret`, :func:`_delete_secret`,
    and :func:`_get_secret`.

    Args:
        name: The candidate secret name to validate.

    Returns:
        str | None: ``None`` when the name is acceptable, otherwise a message
        describing why it was rejected (suitable for returning to the user).
    """
    if not name or not name.strip():
        return "Secret name must be non-empty."
    n = name.strip().lower()
    if not all(c.isalnum() or c in "_-" for c in n):
        return "Secret name must be alphanumeric (hyphens/underscores allowed)."
    return None


def _encryption_db_path(ctx) -> str:
    """Resolve the SQLite path that stores per-user encryption keys.

    Prefers ``ctx.config.api_key_encryption_db_path`` when the tool context
    carries a config object exposing it, otherwise falls back to the
    module-level ``_DEFAULT_ENCRYPTION_DB_PATH`` (``data/api_key_encryption_keys.db``).
    The returned path names the SQLite database that
    :func:`api_key_encryption.get_or_create_user_key` reads to derive each user's
    AES key; this function only inspects ``ctx`` and performs no I/O.

    Called within this module by :func:`_store_secret` when deriving the user
    key to encrypt a new secret. The same helper name is defined independently
    in several sibling ``tools/`` modules (``manage_api_keys``, ``sftp_tools``,
    ``totp_tools``); this copy is module-local.

    Args:
        ctx: The :class:`ToolContext` for the invocation; its optional ``config``
            may carry ``api_key_encryption_db_path``.

    Returns:
        str: The SQLite database path for the encryption key store.
    """
    config = getattr(ctx, "config", None)
    if config and hasattr(config, "api_key_encryption_db_path"):
        return config.api_key_encryption_db_path
    return _DEFAULT_ENCRYPTION_DB_PATH


async def _get_redis(ctx):
    """Return the Redis client from the tool context or fail loudly.

    Extracts the ``redis`` attribute off the :class:`ToolContext` that the
    registry passes into every handler, guarding against a missing client so
    callers can assume a usable connection. Performs no I/O itself despite
    being ``async`` (it matches the awaited call convention used by the
    handlers in this module).

    Called by the three Redis-backed handlers :func:`_store_secret`,
    :func:`_delete_secret`, and :func:`_list_secrets`, each of which awaits it
    before reading or writing the user's secret hash. (Other ``tools/`` modules
    define their own same-named helper; this one is module-local.)

    Args:
        ctx: The :class:`ToolContext` for the current tool invocation, expected
            to carry a connected ``redis`` client.

    Returns:
        The Redis client instance from ``ctx.redis``.

    Raises:
        RuntimeError: If ``ctx`` has no ``redis`` client (``ctx.redis`` is
            ``None`` or absent).
    """
    r = getattr(ctx, "redis", None)
    if r is None:
        raise RuntimeError("Redis not available")
    return r


# ---------------------------------------------------------------
# Public helper for other tools
# ---------------------------------------------------------------


[docs] async def resolve_user_secret( user_id: str, secret_name: str, *, redis_client=None, config=None, ) -> str | None: """Decrypt and return one of a user's stored secrets, or None if absent. This is the public read path other tools use to dereference a ``secret:name`` credential before their handler runs. It validates the name, reads the ciphertext field from the user's Redis secrets hash, and decrypts it with the per-user key derived from the master key. Designed to fail soft: any missing secret, unconfigured master key, or decryption error yields ``None`` rather than raising, so credential resolution never crashes a caller. Issues ``HGET`` against the hash named by :func:`_redis_key` (``stargazer:user_secrets:{user_id}``) via the supplied Redis client. A legacy plaintext value (failing :func:`api_key_encryption.is_encrypted`) is returned as-is. Otherwise it resolves the master key with :func:`resolve_master_key`, derives the user key via :func:`get_or_create_user_key` (reading the SQLite store at ``config.api_key_encryption_db_path`` or ``_DEFAULT_ENCRYPTION_DB_PATH``), and runs the blocking :func:`decrypt` in a worker thread. Failures are logged via ``logger.exception``/``logger.warning``. Called by :func:`_get_secret` in this module and, more importantly, by the ``secret:name`` resolution path in ``tools/__init__.py`` (which imports ``SECRET_REF_PREFIX`` and ``resolve_user_secret`` to rewrite credential parameters before dispatching a tool handler). Args: user_id: The platform user whose secret should be read. secret_name: The secret's name (lowercased before lookup). redis_client: An async Redis client; if falsy the function returns ``None`` immediately. config: Optional config object supplying ``api_key_encryption_db_path``. Returns: str | None: The decrypted secret value, or ``None`` if it is missing, the inputs are incomplete, the master key is unset, or decryption fails. """ if not redis_client or not user_id or not secret_name: return None name = secret_name.strip().lower() if not name: return None err = _validate_secret_name(name) if err: return None try: redis_key = _redis_key(user_id) raw = await redis_client.hget(redis_key, name) if raw is None: return None raw_str = raw if isinstance(raw, str) else raw.decode() if not is_encrypted(raw_str): return raw_str # legacy plaintext (shouldn't happen for new secrets) master_key = resolve_master_key() if not master_key: logger.warning("Encrypted secret found but API_KEY_MASTER_KEY not set") return None sqlite_path = ( getattr(config, "api_key_encryption_db_path", None) or _DEFAULT_ENCRYPTION_DB_PATH ) user_key = await get_or_create_user_key(user_id, sqlite_path, master_key) return await asyncio.to_thread(decrypt, raw_str, user_key) except Exception: logger.exception("Failed to resolve secret %s for user %s", name, user_id) return None
# --------------------------------------------------------------- # Tool handlers # --------------------------------------------------------------- async def _store_secret( name: str, value: str, ctx: ToolContext | None = None, ) -> str: """Encrypt and persist one named secret for the current user. Validates the name and value, derives a per-user encryption key, encrypts the value with AES-256-GCM, and stores the ciphertext as a field in the user's Redis secrets hash. Enforces a non-empty, alphanumeric name and the ``MAX_SECRET_VALUE_LENGTH`` size cap, and refuses to store anything when the master key is not configured (so secrets are never written in plaintext). Gated by :func:`tools.alter_privileges.has_privilege` against ``PRIVILEGES["STARGAZER_USE"]``; validates via :func:`_validate_secret_name`; resolves the encryption master key with :func:`resolve_master_key`, the per-user key via :func:`get_or_create_user_key` (read from the SQLite path given by :func:`_encryption_db_path`), and encrypts with :func:`encrypt`. It then writes the ciphertext with ``HSET`` into the hash named by :func:`_redis_key` (``stargazer:user_secrets:{user_id}``, field = lowercased name) and logs the store at info level. The stored value is later readable via :func:`resolve_user_secret` and the ``secret:name`` reference mechanism. Registered as the ``store_secret`` tool handler in the module-level ``TOOLS`` list; it is discovered by ``tool_loader`` and dispatched by the tool registry when the model calls ``store_secret``. No direct internal callers invoke it by name. Args: name: The secret's name (alphanumeric plus ``-``/``_``); lowercased and used as the Redis hash field. value: The plaintext secret to encrypt and store (trimmed; must be non-empty and at most ``MAX_SECRET_VALUE_LENGTH`` characters). ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and optional ``config``. Returns: str: A JSON string with ``success`` and either ``message``/``name`` on success or an ``error`` describing the access, validation, or configuration failure. """ r = await _get_redis(ctx) uid = ctx.user_id config = getattr(ctx, "config", None) if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config): return json.dumps( {"success": False, "error": "Access denied (STARGAZER_USE required)."} ) err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) if not value or not value.strip(): return json.dumps( {"success": False, "error": "Secret value must be non-empty."} ) if len(value) > MAX_SECRET_VALUE_LENGTH: return json.dumps( { "success": False, "error": f"Secret value exceeds maximum length ({MAX_SECRET_VALUE_LENGTH} chars).", } ) master_key = resolve_master_key() if master_key is None: logger.warning("API_KEY_MASTER_KEY not set; refusing to store secret") return json.dumps( { "success": False, "error": ( "Secrets encryption is not configured. Set API_KEY_MASTER_KEY " "(base64-encoded 32-byte key) in the environment." ), } ) name_lower = name.strip().lower() sqlite_path = _encryption_db_path(ctx) user_key = await get_or_create_user_key(uid, sqlite_path, master_key) encrypted = encrypt(value.strip(), user_key) redis_key = _redis_key(uid) await r.hset(redis_key, name_lower, encrypted) logger.info("User %s stored secret '%s'", uid, name_lower) return json.dumps( { "success": True, "message": f"Secret '{name_lower}' has been stored securely.", "name": name_lower, } ) async def _delete_secret( name: str, ctx: ToolContext | None = None, ) -> str: """Remove one named secret from the current user's secrets hash. Validates the name, then deletes the matching field from the user's Redis hash. Treats a missing secret as a non-error (reports ``deleted: False``), and does not touch any tool that may have previously referenced the secret. Gated by :func:`tools.alter_privileges.has_privilege` against ``PRIVILEGES["STARGAZER_USE"]`` and validated via :func:`_validate_secret_name`. Issues ``HDEL`` against the hash named by :func:`_redis_key` (``stargazer:user_secrets:{user_id}``) using the lowercased name as the field, and logs the deletion at info level when a field was actually removed. Registered as the ``delete_secret`` tool handler in the module-level ``TOOLS`` list; discovered by ``tool_loader`` and dispatched by the registry when the model calls ``delete_secret``. No direct internal callers invoke it by name. Args: name: The secret's name to delete (lowercased before lookup). ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and optional ``config``. Returns: str: A JSON string with ``success`` plus a ``deleted`` boolean and ``message`` on success, or an ``error`` on access/validation failure. """ r = await _get_redis(ctx) uid = ctx.user_id config = getattr(ctx, "config", None) if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config): return json.dumps( {"success": False, "error": "Access denied (STARGAZER_USE required)."} ) err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) name_lower = name.strip().lower() redis_key = _redis_key(uid) deleted = await r.hdel(redis_key, name_lower) if deleted: logger.info("User %s deleted secret '%s'", uid, name_lower) return json.dumps( { "success": True, "deleted": True, "message": f"Secret '{name_lower}' has been removed.", } ) return json.dumps( { "success": True, "deleted": False, "message": f"No secret named '{name_lower}' found.", } ) async def _list_secrets( ctx: ToolContext | None = None, ) -> str: """List the names of all secrets the current user has stored. Reads every field of the user's Redis secrets hash and returns the sorted secret names. Only names are exposed; values are never read or decrypted here, so the result is safe to surface to the user. Gated by :func:`tools.alter_privileges.has_privilege` against ``PRIVILEGES["STARGAZER_USE"]``. Issues ``HGETALL`` against the hash named by :func:`_redis_key` (``stargazer:user_secrets:{user_id}``) and decodes/sorts the field names, returning an empty list when no secrets are stored. Registered as the ``list_secrets`` tool handler in the module-level ``TOOLS`` list; discovered by ``tool_loader`` and dispatched by the registry when the model calls ``list_secrets``. No direct internal callers invoke it by name. Args: ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and optional ``config``. Returns: str: A JSON string with ``success``, a sorted ``names`` list, and a ``count`` (plus a ``message`` when empty), or an ``error`` on access failure. """ r = await _get_redis(ctx) uid = ctx.user_id config = getattr(ctx, "config", None) if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config): return json.dumps( {"success": False, "error": "Access denied (STARGAZER_USE required)."} ) redis_key = _redis_key(uid) all_secrets = await r.hgetall(redis_key) if not all_secrets: return json.dumps( { "success": True, "names": [], "count": 0, "message": "No secrets stored.", } ) names = sorted(s if isinstance(s, str) else s.decode() for s in all_secrets.keys()) return json.dumps( { "success": True, "names": names, "count": len(names), } ) async def _get_secret( name: str, ctx: ToolContext | None = None, ) -> str: """Return the decrypted value of one named secret for the current user. Backs the ``get_secret`` tool, the explicit escape hatch for the rare case where a tool cannot accept a ``secret:name`` reference and the model needs the raw value. The plaintext is returned in the JSON result, so callers must never echo it into a public channel. Unlike the other handlers here it does not re-check ``STARGAZER_USE`` (the surrounding registry/privilege flow gates access); it only guards on a usable context and a valid name. Validates the name with :func:`_validate_secret_name`, then delegates the Redis read and decryption to :func:`resolve_user_secret` (passing ``ctx.redis`` and ``ctx.config``), which reads the user's secrets hash named by :func:`_redis_key`. Registered as the ``get_secret`` handler in the module-level ``TOOLS`` list and dispatched by the tool registry when the model calls ``get_secret``; no direct internal callers invoke it by name. Args: name: The secret's name to retrieve (lowercased before lookup). ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and optional ``config``. Returns: str: A JSON string with ``success``, the ``name``, and the decrypted ``value`` on success, or an ``error`` when the context is unavailable, the name is invalid, or no such secret exists. """ if ctx is None or ctx.redis is None or not ctx.user_id: return json.dumps({"success": False, "error": "Context not available."}) err = _validate_secret_name(name) if err: return json.dumps({"success": False, "error": err}) name_lower = name.strip().lower() value = await resolve_user_secret( ctx.user_id, name_lower, redis_client=ctx.redis, config=getattr(ctx, "config", None), ) if value is None: return json.dumps( { "success": False, "error": f"Secret '{name_lower}' not found.", } ) return json.dumps( { "success": True, "name": name_lower, "value": value, } ) # --------------------------------------------------------------- # Multi-tool registration # --------------------------------------------------------------- TOOLS = [ { "name": "store_secret", "description": ( "Store a named secret (password, auth token, private key, etc.) " "encrypted per-user. The secret can be referenced in other tools " "by using secret:name in credential fields (e.g. password=secret:my_db_pass). " "IMPORTANT: For privacy, recommend the user send secrets via DM " "rather than in a public channel. Secret names: alphanumeric, hyphens, underscores." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "A name for the secret (e.g. 'my_db_password', 'github_token').", }, "value": { "type": "string", "description": "The secret value to store (password, token, key, etc.).", }, }, "required": ["name", "value"], }, "handler": _store_secret, }, { "name": "delete_secret", "description": ( "Delete a previously stored secret by name. " "Only removes the secret; does not affect tools that may have used it." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "The name of the secret to delete.", }, }, "required": ["name"], }, "handler": _delete_secret, }, { "name": "list_secrets", "description": ( "List the names of secrets the user has stored. " "Never shows secret values, only names." ), "parameters": { "type": "object", "properties": {}, }, "handler": _list_secrets, }, { "name": "get_secret", "description": ( "Retrieve the decrypted value of a stored secret by name. " "Use when you need the value for a tool that does not support secret:name. " "WARNING: Never display or repeat the returned value to users in public channels. " "Prefer using secret:name in tool parameters when the tool supports it." ), "parameters": { "type": "object", "properties": { "name": { "type": "string", "description": "The name of the secret to retrieve.", }, }, "required": ["name"], }, "handler": _get_secret, }, ]