Source code for api_key_encryption

"""
API Key Encryption Module

Per-user AES-256-GCM encryption for API keys. Encryption keys are stored in a
dedicated SQLite database, protected by a master KEK from environment.
"""

from __future__ import annotations

import asyncio
import base64
import hashlib
import logging
import os
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

from cryptography.hazmat.primitives.ciphers.aead import AESGCM

logger = logging.getLogger(__name__)

ENCRYPTED_PREFIX = "v2:"
"""Prefix for encrypted values in Redis. Values without this are legacy plaintext."""

POOL_KEY_SALT = b"stargazer:api_key_pool"


def _ensure_db_dir(path: str | Path) -> None:
    """Ensure the parent directory of the SQLite key store exists on disk.

    Creates any missing parent directories for the given SQLite path so that a
    subsequent ``sqlite3.connect`` does not fail on a first run when the data
    directory has never been provisioned.

    This touches the filesystem via ``Path.mkdir(parents=True, exist_ok=True)``
    and is idempotent. Called only by :func:`_get_or_create_user_key_sync`
    immediately before it opens the encryption-keys database.

    Args:
        path: Filesystem path to the SQLite file whose parent directory should be
            created.
    """
    Path(path).parent.mkdir(parents=True, exist_ok=True)


def _init_schema(conn: sqlite3.Connection) -> None:
    """Create the ``encryption_keys`` table on the connection if it is absent.

    Lazily provisions the single-table schema that maps each ``user_id`` to its
    master-key-encrypted per-user data key plus a creation timestamp, so the
    store works on a fresh database without a separate migration step.

    This issues a ``CREATE TABLE IF NOT EXISTS`` and commits on the given
    SQLite connection (a filesystem-backed database). Called only by
    :func:`_get_or_create_user_key_sync` right after it connects.

    Args:
        conn: Open SQLite connection on which to create the table and commit.
    """
    conn.execute("""
        CREATE TABLE IF NOT EXISTS encryption_keys (
            user_id TEXT PRIMARY KEY,
            encrypted_key BLOB NOT NULL,
            created_at TEXT NOT NULL
        )
    """)
    conn.commit()


def _get_or_create_user_key_sync(
    user_id: str,
    sqlite_path: str | Path,
    master_key: bytes,
) -> bytes:
    """Load or mint a user's 32-byte data key, persisting it master-key-encrypted.

    Blocking implementation behind :func:`get_or_create_user_key`. On a cache
    hit it decrypts the stored per-user key with the master KEK; on a miss it
    generates a fresh AES-256 key, wraps it under the master key, and inserts it
    so every user gets a stable, independently-rotatable key derived once.

    This opens (and always closes) a SQLite connection at ``sqlite_path`` after
    ensuring its directory via :func:`_ensure_db_dir` and the schema via
    :func:`_init_schema`; it reads/writes the ``encryption_keys`` table and uses
    ``AESGCM`` with the master key to unwrap or wrap the stored key. Stored blobs
    are a 12-byte nonce concatenated with the GCM ciphertext. Called only by
    :func:`get_or_create_user_key` via ``asyncio.to_thread`` to keep the SQLite
    and crypto work off the event loop.

    Args:
        user_id: Identifier whose per-user key is being looked up or created.
        sqlite_path: Path to the SQLite key store (created if missing).
        master_key: 32-byte master KEK used to wrap/unwrap the per-user key.

    Returns:
        bytes: The plaintext 32-byte per-user AES-256-GCM key.

    Raises:
        ValueError: If a stored encrypted key blob is shorter than the minimum
            nonce-plus-tag length and is therefore unusable.
        cryptography.exceptions.InvalidTag: If the stored key fails GCM
            authentication (for example, a wrong or rotated master key).
    """
    _ensure_db_dir(sqlite_path)
    conn = sqlite3.connect(str(sqlite_path))
    try:
        _init_schema(conn)
        row = conn.execute(
            "SELECT encrypted_key FROM encryption_keys WHERE user_id = ?",
            (user_id,),
        ).fetchone()
        if row:
            encrypted_blob = row[0]
            aesgcm = AESGCM(master_key)
            combined = bytes(encrypted_blob)
            if len(combined) < 28:
                raise ValueError("Stored encrypted key too short")
            nonce = combined[:12]
            ciphertext = combined[12:]
            key_bytes = aesgcm.decrypt(nonce, ciphertext, None)
            return key_bytes
        # Generate new key
        key = AESGCM.generate_key(bit_length=256)
        aesgcm = AESGCM(master_key)
        nonce = os.urandom(12)
        ciphertext = aesgcm.encrypt(nonce, key, None)
        combined = nonce + ciphertext
        created_at = datetime.now(timezone.utc).isoformat()
        conn.execute(
            "INSERT INTO encryption_keys (user_id, encrypted_key, created_at) VALUES (?, ?, ?)",
            (user_id, combined, created_at),
        )
        conn.commit()
        return key
    finally:
        conn.close()


[docs] async def get_or_create_user_key( user_id: str, sqlite_path: str | Path, master_key: bytes, ) -> bytes: """Asynchronously load or mint a user's 32-byte per-user encryption key. Public async entry point for obtaining the stable per-user data key that tools then pass to :func:`encrypt` / :func:`decrypt`. It offloads the blocking SQLite and crypto work to a worker thread so callers on the event loop never stall. This simply wraps :func:`_get_or_create_user_key_sync` via ``asyncio.to_thread``, inheriting its SQLite reads/writes against ``sqlite_path`` and its master-key wrapping. Called widely across the secrets and credential tooling -- including ``user_llm_config.py`` and the ``tools/`` modules ``manage_secrets``, ``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and ``_credential_profile_store`` -- as well as the ``scripts/migrate_api_keys_to_encrypted.py`` migration. Args: user_id: Identifier whose per-user key is being looked up or created. sqlite_path: Path to the SQLite key store (created if missing). master_key: 32-byte master KEK used to wrap/unwrap the per-user key. Returns: bytes: The plaintext 32-byte per-user AES-256-GCM key. Raises: ValueError: Propagated from the sync helper if a stored key blob is too short. cryptography.exceptions.InvalidTag: Propagated if a stored key fails GCM authentication under ``master_key``. """ return await asyncio.to_thread( _get_or_create_user_key_sync, user_id, sqlite_path, master_key, )
[docs] def encrypt(plaintext: str, key: bytes) -> str: """Encrypt a string with AES-256-GCM and return a prefixed base64 token. Wraps a secret value (an API key, credential JSON, TOTP blob, etc.) under the supplied 32-byte key using a fresh random nonce, producing a self-describing ciphertext that :func:`decrypt` and :func:`is_encrypted` can recognize by its ``ENCRYPTED_PREFIX`` tag. Using a new nonce per call keeps GCM secure even when the same plaintext is re-encrypted. This performs no I/O: it builds an ``AESGCM`` cipher from ``key``, draws a 12-byte nonce from ``os.urandom``, and URL-safe base64-encodes the nonce concatenated with the ciphertext, prepending ``ENCRYPTED_PREFIX``. Called by ``user_llm_config.py`` and the secret-writing paths in ``tools/`` such as ``manage_secrets``, ``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and ``_credential_profile_store``, plus the migration script -- typically with a key from :func:`get_or_create_user_key` or :func:`get_pool_key`. Args: plaintext: The UTF-8 string to encrypt. key: 32-byte AES-256-GCM key. Returns: str: The ``ENCRYPTED_PREFIX``-tagged, URL-safe base64 ciphertext token. """ aesgcm = AESGCM(key) nonce = os.urandom(12) plaintext_bytes = plaintext.encode("utf-8") ciphertext = aesgcm.encrypt(nonce, plaintext_bytes, None) combined = nonce + ciphertext b64 = base64.urlsafe_b64encode(combined).decode("utf-8") return ENCRYPTED_PREFIX + b64
[docs] def decrypt(ciphertext: str, key: bytes) -> str: """Decrypt a base64 token produced by :func:`encrypt` back to its plaintext. Reverses :func:`encrypt`: it strips the optional ``ENCRYPTED_PREFIX`` tag, base64-decodes, splits off the leading nonce, and authenticates-and-decrypts the remainder with AES-256-GCM under the supplied key. GCM authentication means a wrong key or tampered ciphertext fails loudly rather than returning garbage. This performs no I/O. Called by the secret-reading paths in ``user_llm_config.py`` and the ``tools/`` modules ``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and ``_credential_profile_store``, generally with a key from :func:`get_or_create_user_key` or :func:`get_pool_key`. Args: ciphertext: A token from :func:`encrypt`, with or without the ``ENCRYPTED_PREFIX`` tag. key: 32-byte AES-256-GCM key, matching the one used to encrypt. Returns: str: The decrypted UTF-8 plaintext. Raises: ValueError: If the decoded payload is shorter than the minimum nonce-plus-tag length. cryptography.exceptions.InvalidTag: If GCM authentication fails (wrong key or corrupted/tampered ciphertext). """ if ciphertext.startswith(ENCRYPTED_PREFIX): ciphertext = ciphertext[len(ENCRYPTED_PREFIX) :] combined = base64.urlsafe_b64decode(ciphertext) if len(combined) < 28: raise ValueError("Encrypted data too short") nonce = combined[:12] ct = combined[12:] aesgcm = AESGCM(key) plaintext_bytes = aesgcm.decrypt(nonce, ct, None) return plaintext_bytes.decode("utf-8")
[docs] def get_pool_key(master_key: bytes) -> bytes: """Derive the shared pool encryption key from the master KEK. Stretches the master key into a distinct 32-byte key used to encrypt entries in the shared (non-per-user) API key pool, so pool secrets are protected by a key that is separate from any single user's data key yet still bound to the same master KEK. The derivation is deterministic, so the same master key always yields the same pool key. This performs no I/O: it runs PBKDF2-HMAC-SHA256 over ``master_key`` with the fixed ``POOL_KEY_SALT`` and 100000 iterations. The result is paired with :func:`encrypt` / :func:`decrypt` for pool values. Called by the pool paths in ``tools/manage_api_keys.py`` and the migration script ``scripts/migrate_api_keys_to_encrypted.py``. Args: master_key: 32-byte master KEK to stretch. Returns: bytes: The derived 32-byte pool encryption key. """ return hashlib.pbkdf2_hmac( "sha256", master_key, POOL_KEY_SALT, iterations=100000, dklen=32, )
[docs] def resolve_master_key() -> bytes | None: """Load and validate the master KEK from the environment, or return ``None``. Resolves the 32-byte master key-encryption key that gates all per-user and pool encryption from the ``API_KEY_MASTER_KEY`` environment variable. Callers treat a ``None`` result as "encryption disabled" and fall back to legacy plaintext handling, so a missing or malformed key degrades gracefully instead of crashing. This reads ``os.environ`` and URL-safe base64-decodes the value, logging a warning (via the module ``logger``) and returning ``None`` when the variable is empty, undecodable, or not exactly 32 bytes. Called as the first step of nearly every encryption-aware code path -- ``user_llm_config.py``, the ``tools/`` secret and credential modules, and the migration script -- to decide whether encryption is available. Returns: bytes | None: The validated 32-byte master key, or ``None`` if it is unset or invalid. """ key_b64 = os.environ.get("API_KEY_MASTER_KEY", "").strip() if not key_b64: return None try: key = base64.urlsafe_b64decode(key_b64) if len(key) == 32: return key logger.warning("API_KEY_MASTER_KEY is not 32 bytes, ignoring") return None except Exception as e: logger.warning("Failed to decode API_KEY_MASTER_KEY: %s", e) return None
[docs] def is_encrypted(value: str) -> bool: """Report whether a stored value is an encrypted token versus legacy plaintext. Cheap structural check used to distinguish ciphertext written by :func:`encrypt` from older plaintext values still living in storage, letting callers decrypt only what needs decrypting and migrate plaintext in place. This performs no I/O and only tests for the ``ENCRYPTED_PREFIX`` tag at the start of the string. Called throughout the secret-reading paths -- ``user_llm_config.py`` and the ``tools/`` modules ``manage_secrets``, ``manage_api_keys``, ``sftp_tools``, ``totp_tools``, and ``_credential_profile_store`` -- plus the migration script. Args: value: The stored string to classify. Returns: bool: ``True`` if the value carries the encrypted prefix, else ``False``. """ return value.startswith(ENCRYPTED_PREFIX)
[docs] def api_key_hash(api_key: str) -> str: """Compute a deterministic SHA-256 lookup hash for a pooled API key. Produces a stable identifier for an API key so the shared pool can be indexed and deduplicated without ever using the plaintext key itself as a lookup field. Being deterministic, the same key always maps to the same hash, which is what makes pool membership checks possible. This performs no I/O: it returns the hex SHA-256 digest of the UTF-8-encoded key. Called by the pool-management paths in ``tools/manage_api_keys.py`` and the migration script ``scripts/migrate_api_keys_to_encrypted.py`` to build the pool's hash-keyed index. Args: api_key: The plaintext API key to hash. Returns: str: The lowercase hex SHA-256 digest of the key. """ return hashlib.sha256(api_key.encode()).hexdigest()