"""Per-user secrets management.
Users can store named secrets (passwords, auth tokens, private keys, etc.)
encrypted per-user. Other tools can reference secrets by name using the
secret:name prefix in credential parameters; the registry resolves these
transparently before the handler runs.
Redis key: stargazer:user_secrets:{user_id}
Secrets are encrypted at rest (AES-256-GCM) with per-user keys in SQLite.
"""
from __future__ import annotations
import asyncio
import jsonutil as json
import logging
from typing import TYPE_CHECKING
from tools.alter_privileges import has_privilege, PRIVILEGES
from api_key_encryption import (
decrypt,
encrypt,
get_or_create_user_key,
is_encrypted,
resolve_master_key,
)
if TYPE_CHECKING:
from tool_context import ToolContext
logger = logging.getLogger(__name__)
_DEFAULT_ENCRYPTION_DB_PATH = "data/api_key_encryption_keys.db"
REDIS_KEY_PREFIX = "stargazer:user_secrets"
SECRET_REF_PREFIX = "secret:"
MAX_SECRET_VALUE_LENGTH = 16384
def _redis_key(user_id: str) -> str:
"""Build the Redis hash key that holds one user's secrets.
Joins the module-level ``REDIS_KEY_PREFIX`` with ``user_id`` to produce
the per-user namespace ``stargazer:user_secrets:{user_id}``. The returned
key names a Redis hash whose fields are lowercased secret names mapping to
encrypted values. This is a pure string builder with no side effects.
Called by every secret operation in this module that touches Redis:
:func:`resolve_user_secret`, :func:`_store_secret`, :func:`_delete_secret`,
and :func:`_list_secrets`. (The same helper name exists independently in
several other ``tools/`` modules; this one is module-local.)
Args:
user_id: The platform user identifier whose secret namespace is wanted.
Returns:
str: The Redis hash key ``stargazer:user_secrets:{user_id}``.
"""
return f"{REDIS_KEY_PREFIX}:{user_id}"
def _validate_secret_name(name: str) -> str | None:
"""Validate a secret name and return a human-readable error, or None if valid.
Enforces the naming contract for secret fields so they are safe to use as
Redis hash field names and as ``secret:name`` references: the name must be
non-empty and contain only alphanumerics, hyphens, and underscores. This is
a pure check with no I/O; it lowercases and strips a copy of the name only
to test the character set, and does not mutate the caller's value.
Called by every name-taking operation in this module before touching Redis:
:func:`resolve_user_secret`, :func:`_store_secret`, :func:`_delete_secret`,
and :func:`_get_secret`.
Args:
name: The candidate secret name to validate.
Returns:
str | None: ``None`` when the name is acceptable, otherwise a message
describing why it was rejected (suitable for returning to the user).
"""
if not name or not name.strip():
return "Secret name must be non-empty."
n = name.strip().lower()
if not all(c.isalnum() or c in "_-" for c in n):
return "Secret name must be alphanumeric (hyphens/underscores allowed)."
return None
def _encryption_db_path(ctx) -> str:
"""Resolve the SQLite path that stores per-user encryption keys.
Prefers ``ctx.config.api_key_encryption_db_path`` when the tool context
carries a config object exposing it, otherwise falls back to the
module-level ``_DEFAULT_ENCRYPTION_DB_PATH`` (``data/api_key_encryption_keys.db``).
The returned path names the SQLite database that
:func:`api_key_encryption.get_or_create_user_key` reads to derive each user's
AES key; this function only inspects ``ctx`` and performs no I/O.
Called within this module by :func:`_store_secret` when deriving the user
key to encrypt a new secret. The same helper name is defined independently
in several sibling ``tools/`` modules (``manage_api_keys``, ``sftp_tools``,
``totp_tools``); this copy is module-local.
Args:
ctx: The :class:`ToolContext` for the invocation; its optional ``config``
may carry ``api_key_encryption_db_path``.
Returns:
str: The SQLite database path for the encryption key store.
"""
config = getattr(ctx, "config", None)
if config and hasattr(config, "api_key_encryption_db_path"):
return config.api_key_encryption_db_path
return _DEFAULT_ENCRYPTION_DB_PATH
async def _get_redis(ctx):
"""Return the Redis client from the tool context or fail loudly.
Extracts the ``redis`` attribute off the :class:`ToolContext` that the
registry passes into every handler, guarding against a missing client so
callers can assume a usable connection. Performs no I/O itself despite
being ``async`` (it matches the awaited call convention used by the
handlers in this module).
Called by the three Redis-backed handlers :func:`_store_secret`,
:func:`_delete_secret`, and :func:`_list_secrets`, each of which awaits it
before reading or writing the user's secret hash. (Other ``tools/`` modules
define their own same-named helper; this one is module-local.)
Args:
ctx: The :class:`ToolContext` for the current tool invocation, expected
to carry a connected ``redis`` client.
Returns:
The Redis client instance from ``ctx.redis``.
Raises:
RuntimeError: If ``ctx`` has no ``redis`` client (``ctx.redis`` is
``None`` or absent).
"""
r = getattr(ctx, "redis", None)
if r is None:
raise RuntimeError("Redis not available")
return r
# ---------------------------------------------------------------
# Public helper for other tools
# ---------------------------------------------------------------
[docs]
async def resolve_user_secret(
user_id: str,
secret_name: str,
*,
redis_client=None,
config=None,
) -> str | None:
"""Decrypt and return one of a user's stored secrets, or None if absent.
This is the public read path other tools use to dereference a ``secret:name``
credential before their handler runs. It validates the name, reads the
ciphertext field from the user's Redis secrets hash, and decrypts it with the
per-user key derived from the master key. Designed to fail soft: any missing
secret, unconfigured master key, or decryption error yields ``None`` rather
than raising, so credential resolution never crashes a caller.
Issues ``HGET`` against the hash named by :func:`_redis_key`
(``stargazer:user_secrets:{user_id}``) via the supplied Redis client. A legacy
plaintext value (failing :func:`api_key_encryption.is_encrypted`) is returned
as-is. Otherwise it resolves the master key with :func:`resolve_master_key`,
derives the user key via :func:`get_or_create_user_key` (reading the SQLite
store at ``config.api_key_encryption_db_path`` or
``_DEFAULT_ENCRYPTION_DB_PATH``), and runs the blocking :func:`decrypt` in a
worker thread. Failures are logged via ``logger.exception``/``logger.warning``.
Called by :func:`_get_secret` in this module and, more importantly, by the
``secret:name`` resolution path in ``tools/__init__.py`` (which imports
``SECRET_REF_PREFIX`` and ``resolve_user_secret`` to rewrite credential
parameters before dispatching a tool handler).
Args:
user_id: The platform user whose secret should be read.
secret_name: The secret's name (lowercased before lookup).
redis_client: An async Redis client; if falsy the function returns
``None`` immediately.
config: Optional config object supplying ``api_key_encryption_db_path``.
Returns:
str | None: The decrypted secret value, or ``None`` if it is missing,
the inputs are incomplete, the master key is unset, or decryption fails.
"""
if not redis_client or not user_id or not secret_name:
return None
name = secret_name.strip().lower()
if not name:
return None
err = _validate_secret_name(name)
if err:
return None
try:
redis_key = _redis_key(user_id)
raw = await redis_client.hget(redis_key, name)
if raw is None:
return None
raw_str = raw if isinstance(raw, str) else raw.decode()
if not is_encrypted(raw_str):
return raw_str # legacy plaintext (shouldn't happen for new secrets)
master_key = resolve_master_key()
if not master_key:
logger.warning("Encrypted secret found but API_KEY_MASTER_KEY not set")
return None
sqlite_path = (
getattr(config, "api_key_encryption_db_path", None)
or _DEFAULT_ENCRYPTION_DB_PATH
)
user_key = await get_or_create_user_key(user_id, sqlite_path, master_key)
return await asyncio.to_thread(decrypt, raw_str, user_key)
except Exception:
logger.exception("Failed to resolve secret %s for user %s", name, user_id)
return None
# ---------------------------------------------------------------
# Tool handlers
# ---------------------------------------------------------------
async def _store_secret(
name: str,
value: str,
ctx: ToolContext | None = None,
) -> str:
"""Encrypt and persist one named secret for the current user.
Validates the name and value, derives a per-user encryption key, encrypts
the value with AES-256-GCM, and stores the ciphertext as a field in the
user's Redis secrets hash. Enforces a non-empty, alphanumeric name and the
``MAX_SECRET_VALUE_LENGTH`` size cap, and refuses to store anything when the
master key is not configured (so secrets are never written in plaintext).
Gated by :func:`tools.alter_privileges.has_privilege` against
``PRIVILEGES["STARGAZER_USE"]``; validates via :func:`_validate_secret_name`;
resolves the encryption master key with :func:`resolve_master_key`, the
per-user key via :func:`get_or_create_user_key` (read from the SQLite path
given by :func:`_encryption_db_path`), and encrypts with :func:`encrypt`. It
then writes the ciphertext with ``HSET`` into the hash named by
:func:`_redis_key` (``stargazer:user_secrets:{user_id}``, field = lowercased
name) and logs the store at info level. The stored value is later readable
via :func:`resolve_user_secret` and the ``secret:name`` reference mechanism.
Registered as the ``store_secret`` tool handler in the module-level
``TOOLS`` list; it is discovered by ``tool_loader`` and dispatched by the
tool registry when the model calls ``store_secret``. No direct internal
callers invoke it by name.
Args:
name: The secret's name (alphanumeric plus ``-``/``_``); lowercased and
used as the Redis hash field.
value: The plaintext secret to encrypt and store (trimmed; must be
non-empty and at most ``MAX_SECRET_VALUE_LENGTH`` characters).
ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and
optional ``config``.
Returns:
str: A JSON string with ``success`` and either ``message``/``name`` on
success or an ``error`` describing the access, validation, or
configuration failure.
"""
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
if not value or not value.strip():
return json.dumps(
{"success": False, "error": "Secret value must be non-empty."}
)
if len(value) > MAX_SECRET_VALUE_LENGTH:
return json.dumps(
{
"success": False,
"error": f"Secret value exceeds maximum length ({MAX_SECRET_VALUE_LENGTH} chars).",
}
)
master_key = resolve_master_key()
if master_key is None:
logger.warning("API_KEY_MASTER_KEY not set; refusing to store secret")
return json.dumps(
{
"success": False,
"error": (
"Secrets encryption is not configured. Set API_KEY_MASTER_KEY "
"(base64-encoded 32-byte key) in the environment."
),
}
)
name_lower = name.strip().lower()
sqlite_path = _encryption_db_path(ctx)
user_key = await get_or_create_user_key(uid, sqlite_path, master_key)
encrypted = encrypt(value.strip(), user_key)
redis_key = _redis_key(uid)
await r.hset(redis_key, name_lower, encrypted)
logger.info("User %s stored secret '%s'", uid, name_lower)
return json.dumps(
{
"success": True,
"message": f"Secret '{name_lower}' has been stored securely.",
"name": name_lower,
}
)
async def _delete_secret(
name: str,
ctx: ToolContext | None = None,
) -> str:
"""Remove one named secret from the current user's secrets hash.
Validates the name, then deletes the matching field from the user's Redis
hash. Treats a missing secret as a non-error (reports ``deleted: False``),
and does not touch any tool that may have previously referenced the secret.
Gated by :func:`tools.alter_privileges.has_privilege` against
``PRIVILEGES["STARGAZER_USE"]`` and validated via
:func:`_validate_secret_name`. Issues ``HDEL`` against the hash named by
:func:`_redis_key` (``stargazer:user_secrets:{user_id}``) using the
lowercased name as the field, and logs the deletion at info level when a
field was actually removed.
Registered as the ``delete_secret`` tool handler in the module-level
``TOOLS`` list; discovered by ``tool_loader`` and dispatched by the registry
when the model calls ``delete_secret``. No direct internal callers invoke it
by name.
Args:
name: The secret's name to delete (lowercased before lookup).
ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and
optional ``config``.
Returns:
str: A JSON string with ``success`` plus a ``deleted`` boolean and
``message`` on success, or an ``error`` on access/validation failure.
"""
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
name_lower = name.strip().lower()
redis_key = _redis_key(uid)
deleted = await r.hdel(redis_key, name_lower)
if deleted:
logger.info("User %s deleted secret '%s'", uid, name_lower)
return json.dumps(
{
"success": True,
"deleted": True,
"message": f"Secret '{name_lower}' has been removed.",
}
)
return json.dumps(
{
"success": True,
"deleted": False,
"message": f"No secret named '{name_lower}' found.",
}
)
async def _list_secrets(
ctx: ToolContext | None = None,
) -> str:
"""List the names of all secrets the current user has stored.
Reads every field of the user's Redis secrets hash and returns the sorted
secret names. Only names are exposed; values are never read or decrypted
here, so the result is safe to surface to the user.
Gated by :func:`tools.alter_privileges.has_privilege` against
``PRIVILEGES["STARGAZER_USE"]``. Issues ``HGETALL`` against the hash named by
:func:`_redis_key` (``stargazer:user_secrets:{user_id}``) and decodes/sorts
the field names, returning an empty list when no secrets are stored.
Registered as the ``list_secrets`` tool handler in the module-level
``TOOLS`` list; discovered by ``tool_loader`` and dispatched by the registry
when the model calls ``list_secrets``. No direct internal callers invoke it
by name.
Args:
ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and
optional ``config``.
Returns:
str: A JSON string with ``success``, a sorted ``names`` list, and a
``count`` (plus a ``message`` when empty), or an ``error`` on access
failure.
"""
r = await _get_redis(ctx)
uid = ctx.user_id
config = getattr(ctx, "config", None)
if not await has_privilege(r, uid, PRIVILEGES["STARGAZER_USE"], config):
return json.dumps(
{"success": False, "error": "Access denied (STARGAZER_USE required)."}
)
redis_key = _redis_key(uid)
all_secrets = await r.hgetall(redis_key)
if not all_secrets:
return json.dumps(
{
"success": True,
"names": [],
"count": 0,
"message": "No secrets stored.",
}
)
names = sorted(s if isinstance(s, str) else s.decode() for s in all_secrets.keys())
return json.dumps(
{
"success": True,
"names": names,
"count": len(names),
}
)
async def _get_secret(
name: str,
ctx: ToolContext | None = None,
) -> str:
"""Return the decrypted value of one named secret for the current user.
Backs the ``get_secret`` tool, the explicit escape hatch for the rare case
where a tool cannot accept a ``secret:name`` reference and the model needs
the raw value. The plaintext is returned in the JSON result, so callers must
never echo it into a public channel. Unlike the other handlers here it does
not re-check ``STARGAZER_USE`` (the surrounding registry/privilege flow gates
access); it only guards on a usable context and a valid name.
Validates the name with :func:`_validate_secret_name`, then delegates the
Redis read and decryption to :func:`resolve_user_secret` (passing
``ctx.redis`` and ``ctx.config``), which reads the user's secrets hash named
by :func:`_redis_key`. Registered as the ``get_secret`` handler in the
module-level ``TOOLS`` list and dispatched by the tool registry when the
model calls ``get_secret``; no direct internal callers invoke it by name.
Args:
name: The secret's name to retrieve (lowercased before lookup).
ctx: The :class:`ToolContext` providing ``redis``, ``user_id``, and
optional ``config``.
Returns:
str: A JSON string with ``success``, the ``name``, and the decrypted
``value`` on success, or an ``error`` when the context is unavailable,
the name is invalid, or no such secret exists.
"""
if ctx is None or ctx.redis is None or not ctx.user_id:
return json.dumps({"success": False, "error": "Context not available."})
err = _validate_secret_name(name)
if err:
return json.dumps({"success": False, "error": err})
name_lower = name.strip().lower()
value = await resolve_user_secret(
ctx.user_id,
name_lower,
redis_client=ctx.redis,
config=getattr(ctx, "config", None),
)
if value is None:
return json.dumps(
{
"success": False,
"error": f"Secret '{name_lower}' not found.",
}
)
return json.dumps(
{
"success": True,
"name": name_lower,
"value": value,
}
)
# ---------------------------------------------------------------
# Multi-tool registration
# ---------------------------------------------------------------
TOOLS = [
{
"name": "store_secret",
"description": (
"Store a named secret (password, auth token, private key, etc.) "
"encrypted per-user. The secret can be referenced in other tools "
"by using secret:name in credential fields (e.g. password=secret:my_db_pass). "
"IMPORTANT: For privacy, recommend the user send secrets via DM "
"rather than in a public channel. Secret names: alphanumeric, hyphens, underscores."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "A name for the secret (e.g. 'my_db_password', 'github_token').",
},
"value": {
"type": "string",
"description": "The secret value to store (password, token, key, etc.).",
},
},
"required": ["name", "value"],
},
"handler": _store_secret,
},
{
"name": "delete_secret",
"description": (
"Delete a previously stored secret by name. "
"Only removes the secret; does not affect tools that may have used it."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the secret to delete.",
},
},
"required": ["name"],
},
"handler": _delete_secret,
},
{
"name": "list_secrets",
"description": (
"List the names of secrets the user has stored. "
"Never shows secret values, only names."
),
"parameters": {
"type": "object",
"properties": {},
},
"handler": _list_secrets,
},
{
"name": "get_secret",
"description": (
"Retrieve the decrypted value of a stored secret by name. "
"Use when you need the value for a tool that does not support secret:name. "
"WARNING: Never display or repeat the returned value to users in public channels. "
"Prefer using secret:name in tool parameters when the tool supports it."
),
"parameters": {
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "The name of the secret to retrieve.",
},
},
"required": ["name"],
},
"handler": _get_secret,
},
]