api_key_encryption

API Key Encryption Module

Per-user AES-256-GCM encryption for API keys. Encryption keys are stored in a dedicated SQLite database, protected by a master KEK from environment.

api_key_encryption.ENCRYPTED_PREFIX = 'v2:'

Prefix for encrypted values in Redis. Values without this are legacy plaintext.

async api_key_encryption.get_or_create_user_key(user_id, sqlite_path, master_key)[source]

Asynchronously load or mint a user’s 32-byte per-user encryption key.

Public async entry point for obtaining the stable per-user data key that tools then pass to encrypt() / decrypt(). It offloads the blocking SQLite and crypto work to a worker thread so callers on the event loop never stall.

This simply wraps _get_or_create_user_key_sync() via asyncio.to_thread, inheriting its SQLite reads/writes against sqlite_path and its master-key wrapping. Called widely across the secrets and credential tooling – including user_llm_config.py and the tools/ modules manage_secrets, manage_api_keys, sftp_tools, totp_tools, and _credential_profile_store – as well as the scripts/migrate_api_keys_to_encrypted.py migration.

Parameters:
  • user_id (str) – Identifier whose per-user key is being looked up or created.

  • sqlite_path (str | Path) – Path to the SQLite key store (created if missing).

  • master_key (bytes) – 32-byte master KEK used to wrap/unwrap the per-user key.

Returns:

The plaintext 32-byte per-user AES-256-GCM key.

Return type:

bytes

Raises:
  • ValueError – Propagated from the sync helper if a stored key blob is too short.

  • cryptography.exceptions.InvalidTag – Propagated if a stored key fails GCM authentication under master_key.

api_key_encryption.encrypt(plaintext, key)[source]

Encrypt a string with AES-256-GCM and return a prefixed base64 token.

Wraps a secret value (an API key, credential JSON, TOTP blob, etc.) under the supplied 32-byte key using a fresh random nonce, producing a self-describing ciphertext that decrypt() and is_encrypted() can recognize by its ENCRYPTED_PREFIX tag. Using a new nonce per call keeps GCM secure even when the same plaintext is re-encrypted.

This performs no I/O: it builds an AESGCM cipher from key, draws a 12-byte nonce from os.urandom, and URL-safe base64-encodes the nonce concatenated with the ciphertext, prepending ENCRYPTED_PREFIX. Called by user_llm_config.py and the secret-writing paths in tools/ such as manage_secrets, manage_api_keys, sftp_tools, totp_tools, and _credential_profile_store, plus the migration script – typically with a key from get_or_create_user_key() or get_pool_key().

Parameters:
  • plaintext (str) – The UTF-8 string to encrypt.

  • key (bytes) – 32-byte AES-256-GCM key.

Returns:

The ENCRYPTED_PREFIX-tagged, URL-safe base64 ciphertext token.

Return type:

str

api_key_encryption.decrypt(ciphertext, key)[source]

Decrypt a base64 token produced by encrypt() back to its plaintext.

Reverses encrypt(): it strips the optional ENCRYPTED_PREFIX tag, base64-decodes, splits off the leading nonce, and authenticates-and-decrypts the remainder with AES-256-GCM under the supplied key. GCM authentication means a wrong key or tampered ciphertext fails loudly rather than returning garbage.

This performs no I/O. Called by the secret-reading paths in user_llm_config.py and the tools/ modules manage_api_keys, sftp_tools, totp_tools, and _credential_profile_store, generally with a key from get_or_create_user_key() or get_pool_key().

Parameters:
  • ciphertext (str) – A token from encrypt(), with or without the ENCRYPTED_PREFIX tag.

  • key (bytes) – 32-byte AES-256-GCM key, matching the one used to encrypt.

Returns:

The decrypted UTF-8 plaintext.

Return type:

str

Raises:
  • ValueError – If the decoded payload is shorter than the minimum nonce-plus-tag length.

  • cryptography.exceptions.InvalidTag – If GCM authentication fails (wrong key or corrupted/tampered ciphertext).

api_key_encryption.get_pool_key(master_key)[source]

Derive the shared pool encryption key from the master KEK.

Stretches the master key into a distinct 32-byte key used to encrypt entries in the shared (non-per-user) API key pool, so pool secrets are protected by a key that is separate from any single user’s data key yet still bound to the same master KEK. The derivation is deterministic, so the same master key always yields the same pool key.

This performs no I/O: it runs PBKDF2-HMAC-SHA256 over master_key with the fixed POOL_KEY_SALT and 100000 iterations. The result is paired with encrypt() / decrypt() for pool values. Called by the pool paths in tools/manage_api_keys.py and the migration script scripts/migrate_api_keys_to_encrypted.py.

Parameters:

master_key (bytes) – 32-byte master KEK to stretch.

Returns:

The derived 32-byte pool encryption key.

Return type:

bytes

api_key_encryption.resolve_master_key()[source]

Load and validate the master KEK from the environment, or return None.

Resolves the 32-byte master key-encryption key that gates all per-user and pool encryption from the API_KEY_MASTER_KEY environment variable. Callers treat a None result as “encryption disabled” and fall back to legacy plaintext handling, so a missing or malformed key degrades gracefully instead of crashing.

This reads os.environ and URL-safe base64-decodes the value, logging a warning (via the module logger) and returning None when the variable is empty, undecodable, or not exactly 32 bytes. Called as the first step of nearly every encryption-aware code path – user_llm_config.py, the tools/ secret and credential modules, and the migration script – to decide whether encryption is available.

Returns:

The validated 32-byte master key, or None if it is unset or invalid.

Return type:

bytes | None

api_key_encryption.is_encrypted(value)[source]

Report whether a stored value is an encrypted token versus legacy plaintext.

Cheap structural check used to distinguish ciphertext written by encrypt() from older plaintext values still living in storage, letting callers decrypt only what needs decrypting and migrate plaintext in place.

This performs no I/O and only tests for the ENCRYPTED_PREFIX tag at the start of the string. Called throughout the secret-reading paths – user_llm_config.py and the tools/ modules manage_secrets, manage_api_keys, sftp_tools, totp_tools, and _credential_profile_store – plus the migration script.

Parameters:

value (str) – The stored string to classify.

Returns:

True if the value carries the encrypted prefix, else False.

Return type:

bool

api_key_encryption.api_key_hash(api_key)[source]

Compute a deterministic SHA-256 lookup hash for a pooled API key.

Produces a stable identifier for an API key so the shared pool can be indexed and deduplicated without ever using the plaintext key itself as a lookup field. Being deterministic, the same key always maps to the same hash, which is what makes pool membership checks possible.

This performs no I/O: it returns the hex SHA-256 digest of the UTF-8-encoded key. Called by the pool-management paths in tools/manage_api_keys.py and the migration script scripts/migrate_api_keys_to_encrypted.py to build the pool’s hash-keyed index.

Parameters:

api_key (str) – The plaintext API key to hash.

Returns:

The lowercase hex SHA-256 digest of the key.

Return type:

str