"""
API Key Encryption Module
Per-user AES-256-GCM encryption for API keys. Encryption keys are stored in a
dedicated SQLite database, protected by a master KEK from environment.
"""
from __future__ import annotations
import asyncio
import base64
import hashlib
import logging
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
logger = logging.getLogger(__name__)
ENCRYPTED_PREFIX = "v2:"
"""Prefix for encrypted values in Redis. Values without this are legacy plaintext."""
POOL_KEY_SALT = b"stargazer:api_key_pool"
def _ensure_db_dir(path: str | Path) -> None:
"""Ensure the parent directory of the SQLite key store exists on disk.
Creates any missing parent directories for the given SQLite path so that a
subsequent ``sqlite3.connect`` does not fail on a first run when the data
directory has never been provisioned.
This touches the filesystem via ``Path.mkdir(parents=True, exist_ok=True)``
and is idempotent. Called only by :func:`_get_or_create_user_key_sync`
immediately before it opens the encryption-keys database.
Args:
path: Filesystem path to the SQLite file whose parent directory should be
created.
"""
Path(path).parent.mkdir(parents=True, exist_ok=True)
def _init_schema(conn: sqlite3.Connection) -> None:
"""Create the ``encryption_keys`` table on the connection if it is absent.
Lazily provisions the single-table schema that maps each ``user_id`` to its
master-key-encrypted per-user data key plus a creation timestamp, so the
store works on a fresh database without a separate migration step.
This issues a ``CREATE TABLE IF NOT EXISTS`` and commits on the given
SQLite connection (a filesystem-backed database). Called only by
:func:`_get_or_create_user_key_sync` right after it connects.
Args:
conn: Open SQLite connection on which to create the table and commit.
"""
conn.execute("""
CREATE TABLE IF NOT EXISTS encryption_keys (
user_id TEXT PRIMARY KEY,
encrypted_key BLOB NOT NULL,
created_at TEXT NOT NULL
)
""")
conn.commit()
def _get_or_create_user_key_sync(
user_id: str,
sqlite_path: str | Path,
master_key: bytes,
) -> bytes:
"""Load or mint a user's 32-byte data key, persisting it master-key-encrypted.
Blocking implementation behind :func:`get_or_create_user_key`. On a cache
hit it decrypts the stored per-user key with the master KEK; on a miss it
generates a fresh AES-256 key, wraps it under the master key, and inserts it
so every user gets a stable, independently-rotatable key derived once.
This opens (and always closes) a SQLite connection at ``sqlite_path`` after
ensuring its directory via :func:`_ensure_db_dir` and the schema via
:func:`_init_schema`; it reads/writes the ``encryption_keys`` table and uses
``AESGCM`` with the master key to unwrap or wrap the stored key. Stored blobs
are a 12-byte nonce concatenated with the GCM ciphertext. Called only by
:func:`get_or_create_user_key` via ``asyncio.to_thread`` to keep the SQLite
and crypto work off the event loop.
Args:
user_id: Identifier whose per-user key is being looked up or created.
sqlite_path: Path to the SQLite key store (created if missing).
master_key: 32-byte master KEK used to wrap/unwrap the per-user key.
Returns:
bytes: The plaintext 32-byte per-user AES-256-GCM key.
Raises:
ValueError: If a stored encrypted key blob is shorter than the minimum
nonce-plus-tag length and is therefore unusable.
cryptography.exceptions.InvalidTag: If the stored key fails GCM
authentication (for example, a wrong or rotated master key).
"""
_ensure_db_dir(sqlite_path)
conn = sqlite3.connect(str(sqlite_path))
try:
_init_schema(conn)
row = conn.execute(
"SELECT encrypted_key FROM encryption_keys WHERE user_id = ?",
(user_id,),
).fetchone()
if row:
encrypted_blob = row[0]
aesgcm = AESGCM(master_key)
combined = bytes(encrypted_blob)
if len(combined) < 28:
raise ValueError("Stored encrypted key too short")
nonce = combined[:12]
ciphertext = combined[12:]
key_bytes = aesgcm.decrypt(nonce, ciphertext, None)
return key_bytes
# Generate new key
key = AESGCM.generate_key(bit_length=256)
aesgcm = AESGCM(master_key)
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, key, None)
combined = nonce + ciphertext
created_at = datetime.now(timezone.utc).isoformat()
conn.execute(
"INSERT INTO encryption_keys (user_id, encrypted_key, created_at) VALUES (?, ?, ?)",
(user_id, combined, created_at),
)
conn.commit()
return key
finally:
conn.close()
[docs]
async def get_or_create_user_key(
user_id: str,
sqlite_path: str | Path,
master_key: bytes,
) -> bytes:
"""Asynchronously load or mint a user's 32-byte per-user encryption key.
Public async entry point for obtaining the stable per-user data key that
tools then pass to :func:`encrypt` / :func:`decrypt`. It offloads the
blocking SQLite and crypto work to a worker thread so callers on the event
loop never stall.
This simply wraps :func:`_get_or_create_user_key_sync` via
``asyncio.to_thread``, inheriting its SQLite reads/writes against
``sqlite_path`` and its master-key wrapping. Called widely across the
secrets and credential tooling -- including ``user_llm_config.py`` and the
``tools/`` modules ``manage_secrets``, ``manage_api_keys``, ``sftp_tools``,
``totp_tools``, and ``_credential_profile_store`` -- as well as the
``scripts/migrate_api_keys_to_encrypted.py`` migration.
Args:
user_id: Identifier whose per-user key is being looked up or created.
sqlite_path: Path to the SQLite key store (created if missing).
master_key: 32-byte master KEK used to wrap/unwrap the per-user key.
Returns:
bytes: The plaintext 32-byte per-user AES-256-GCM key.
Raises:
ValueError: Propagated from the sync helper if a stored key blob is too
short.
cryptography.exceptions.InvalidTag: Propagated if a stored key fails GCM
authentication under ``master_key``.
"""
return await asyncio.to_thread(
_get_or_create_user_key_sync,
user_id,
sqlite_path,
master_key,
)
[docs]
def encrypt(plaintext: str, key: bytes) -> str:
"""Encrypt a string with AES-256-GCM and return a prefixed base64 token.
Wraps a secret value (an API key, credential JSON, TOTP blob, etc.) under the
supplied 32-byte key using a fresh random nonce, producing a self-describing
ciphertext that :func:`decrypt` and :func:`is_encrypted` can recognize by its
``ENCRYPTED_PREFIX`` tag. Using a new nonce per call keeps GCM secure even
when the same plaintext is re-encrypted.
This performs no I/O: it builds an ``AESGCM`` cipher from ``key``, draws a
12-byte nonce from ``os.urandom``, and URL-safe base64-encodes the nonce
concatenated with the ciphertext, prepending ``ENCRYPTED_PREFIX``. Called by
``user_llm_config.py`` and the secret-writing paths in ``tools/`` such as
``manage_secrets``, ``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and
``_credential_profile_store``, plus the migration script -- typically with a
key from :func:`get_or_create_user_key` or :func:`get_pool_key`.
Args:
plaintext: The UTF-8 string to encrypt.
key: 32-byte AES-256-GCM key.
Returns:
str: The ``ENCRYPTED_PREFIX``-tagged, URL-safe base64 ciphertext token.
"""
aesgcm = AESGCM(key)
nonce = os.urandom(12)
plaintext_bytes = plaintext.encode("utf-8")
ciphertext = aesgcm.encrypt(nonce, plaintext_bytes, None)
combined = nonce + ciphertext
b64 = base64.urlsafe_b64encode(combined).decode("utf-8")
return ENCRYPTED_PREFIX + b64
[docs]
def decrypt(ciphertext: str, key: bytes) -> str:
"""Decrypt a base64 token produced by :func:`encrypt` back to its plaintext.
Reverses :func:`encrypt`: it strips the optional ``ENCRYPTED_PREFIX`` tag,
base64-decodes, splits off the leading nonce, and authenticates-and-decrypts
the remainder with AES-256-GCM under the supplied key. GCM authentication
means a wrong key or tampered ciphertext fails loudly rather than returning
garbage.
This performs no I/O. Called by the secret-reading paths in
``user_llm_config.py`` and the ``tools/`` modules ``manage_api_keys``,
``sftp_tools``, ``totp_tools``, and ``_credential_profile_store``, generally
with a key from :func:`get_or_create_user_key` or :func:`get_pool_key`.
Args:
ciphertext: A token from :func:`encrypt`, with or without the
``ENCRYPTED_PREFIX`` tag.
key: 32-byte AES-256-GCM key, matching the one used to encrypt.
Returns:
str: The decrypted UTF-8 plaintext.
Raises:
ValueError: If the decoded payload is shorter than the minimum
nonce-plus-tag length.
cryptography.exceptions.InvalidTag: If GCM authentication fails (wrong key
or corrupted/tampered ciphertext).
"""
if ciphertext.startswith(ENCRYPTED_PREFIX):
ciphertext = ciphertext[len(ENCRYPTED_PREFIX) :]
combined = base64.urlsafe_b64decode(ciphertext)
if len(combined) < 28:
raise ValueError("Encrypted data too short")
nonce = combined[:12]
ct = combined[12:]
aesgcm = AESGCM(key)
plaintext_bytes = aesgcm.decrypt(nonce, ct, None)
return plaintext_bytes.decode("utf-8")
[docs]
def get_pool_key(master_key: bytes) -> bytes:
"""Derive the shared pool encryption key from the master KEK.
Stretches the master key into a distinct 32-byte key used to encrypt entries
in the shared (non-per-user) API key pool, so pool secrets are protected by a
key that is separate from any single user's data key yet still bound to the
same master KEK. The derivation is deterministic, so the same master key
always yields the same pool key.
This performs no I/O: it runs PBKDF2-HMAC-SHA256 over ``master_key`` with the
fixed ``POOL_KEY_SALT`` and 100000 iterations. The result is paired with
:func:`encrypt` / :func:`decrypt` for pool values. Called by the pool paths
in ``tools/manage_api_keys.py`` and the migration script
``scripts/migrate_api_keys_to_encrypted.py``.
Args:
master_key: 32-byte master KEK to stretch.
Returns:
bytes: The derived 32-byte pool encryption key.
"""
return hashlib.pbkdf2_hmac(
"sha256",
master_key,
POOL_KEY_SALT,
iterations=100000,
dklen=32,
)
[docs]
def resolve_master_key() -> bytes | None:
"""Load and validate the master KEK from the environment, or return ``None``.
Resolves the 32-byte master key-encryption key that gates all per-user and
pool encryption from the ``API_KEY_MASTER_KEY`` environment variable. Callers
treat a ``None`` result as "encryption disabled" and fall back to legacy
plaintext handling, so a missing or malformed key degrades gracefully instead
of crashing.
This reads ``os.environ`` and URL-safe base64-decodes the value, logging a
warning (via the module ``logger``) and returning ``None`` when the variable
is empty, undecodable, or not exactly 32 bytes. Called as the first step of
nearly every encryption-aware code path -- ``user_llm_config.py``, the
``tools/`` secret and credential modules, and the migration script -- to
decide whether encryption is available.
Returns:
bytes | None: The validated 32-byte master key, or ``None`` if it is
unset or invalid.
"""
key_b64 = os.environ.get("API_KEY_MASTER_KEY", "").strip()
if not key_b64:
return None
try:
key = base64.urlsafe_b64decode(key_b64)
if len(key) == 32:
return key
logger.warning("API_KEY_MASTER_KEY is not 32 bytes, ignoring")
return None
except Exception as e:
logger.warning("Failed to decode API_KEY_MASTER_KEY: %s", e)
return None
[docs]
def is_encrypted(value: str) -> bool:
"""Report whether a stored value is an encrypted token versus legacy plaintext.
Cheap structural check used to distinguish ciphertext written by
:func:`encrypt` from older plaintext values still living in storage, letting
callers decrypt only what needs decrypting and migrate plaintext in place.
This performs no I/O and only tests for the ``ENCRYPTED_PREFIX`` tag at the
start of the string. Called throughout the secret-reading paths --
``user_llm_config.py`` and the ``tools/`` modules ``manage_secrets``,
``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and
``_credential_profile_store`` -- plus the migration script.
Args:
value: The stored string to classify.
Returns:
bool: ``True`` if the value carries the encrypted prefix, else ``False``.
"""
return value.startswith(ENCRYPTED_PREFIX)
[docs]
def api_key_hash(api_key: str) -> str:
"""Compute a deterministic SHA-256 lookup hash for a pooled API key.
Produces a stable identifier for an API key so the shared pool can be indexed
and deduplicated without ever using the plaintext key itself as a lookup
field. Being deterministic, the same key always maps to the same hash, which
is what makes pool membership checks possible.
This performs no I/O: it returns the hex SHA-256 digest of the UTF-8-encoded
key. Called by the pool-management paths in ``tools/manage_api_keys.py`` and
the migration script ``scripts/migrate_api_keys_to_encrypted.py`` to build
the pool's hash-keyed index.
Args:
api_key: The plaintext API key to hash.
Returns:
str: The lowercase hex SHA-256 digest of the key.
"""
return hashlib.sha256(api_key.encode()).hexdigest()