api_key_encryption
API Key Encryption Module
Per-user AES-256-GCM encryption for API keys. Encryption keys are stored in a dedicated SQLite database, protected by a master KEK from environment.
- api_key_encryption.ENCRYPTED_PREFIX = 'v2:'
Prefix for encrypted values in Redis. Values without this are legacy plaintext.
- async api_key_encryption.get_or_create_user_key(user_id, sqlite_path, master_key)[source]
Asynchronously load or mint a user’s 32-byte per-user encryption key.
Public async entry point for obtaining the stable per-user data key that tools then pass to
encrypt()/decrypt(). It offloads the blocking SQLite and crypto work to a worker thread so callers on the event loop never stall.This simply wraps
_get_or_create_user_key_sync()viaasyncio.to_thread, inheriting its SQLite reads/writes againstsqlite_pathand its master-key wrapping. Called widely across the secrets and credential tooling – includinguser_llm_config.pyand thetools/modulesmanage_secrets,manage_api_keys,sftp_tools,totp_tools, and_credential_profile_store– as well as thescripts/migrate_api_keys_to_encrypted.pymigration.- Parameters:
- Returns:
The plaintext 32-byte per-user AES-256-GCM key.
- Return type:
- Raises:
ValueError – Propagated from the sync helper if a stored key blob is too short.
cryptography.exceptions.InvalidTag – Propagated if a stored key fails GCM authentication under
master_key.
- api_key_encryption.encrypt(plaintext, key)[source]
Encrypt a string with AES-256-GCM and return a prefixed base64 token.
Wraps a secret value (an API key, credential JSON, TOTP blob, etc.) under the supplied 32-byte key using a fresh random nonce, producing a self-describing ciphertext that
decrypt()andis_encrypted()can recognize by itsENCRYPTED_PREFIXtag. Using a new nonce per call keeps GCM secure even when the same plaintext is re-encrypted.This performs no I/O: it builds an
AESGCMcipher fromkey, draws a 12-byte nonce fromos.urandom, and URL-safe base64-encodes the nonce concatenated with the ciphertext, prependingENCRYPTED_PREFIX. Called byuser_llm_config.pyand the secret-writing paths intools/such asmanage_secrets,manage_api_keys,sftp_tools,totp_tools, and_credential_profile_store, plus the migration script – typically with a key fromget_or_create_user_key()orget_pool_key().
- api_key_encryption.decrypt(ciphertext, key)[source]
Decrypt a base64 token produced by
encrypt()back to its plaintext.Reverses
encrypt(): it strips the optionalENCRYPTED_PREFIXtag, base64-decodes, splits off the leading nonce, and authenticates-and-decrypts the remainder with AES-256-GCM under the supplied key. GCM authentication means a wrong key or tampered ciphertext fails loudly rather than returning garbage.This performs no I/O. Called by the secret-reading paths in
user_llm_config.pyand thetools/modulesmanage_api_keys,sftp_tools,totp_tools, and_credential_profile_store, generally with a key fromget_or_create_user_key()orget_pool_key().- Parameters:
- Returns:
The decrypted UTF-8 plaintext.
- Return type:
- Raises:
ValueError – If the decoded payload is shorter than the minimum nonce-plus-tag length.
cryptography.exceptions.InvalidTag – If GCM authentication fails (wrong key or corrupted/tampered ciphertext).
- api_key_encryption.get_pool_key(master_key)[source]
Derive the shared pool encryption key from the master KEK.
Stretches the master key into a distinct 32-byte key used to encrypt entries in the shared (non-per-user) API key pool, so pool secrets are protected by a key that is separate from any single user’s data key yet still bound to the same master KEK. The derivation is deterministic, so the same master key always yields the same pool key.
This performs no I/O: it runs PBKDF2-HMAC-SHA256 over
master_keywith the fixedPOOL_KEY_SALTand 100000 iterations. The result is paired withencrypt()/decrypt()for pool values. Called by the pool paths intools/manage_api_keys.pyand the migration scriptscripts/migrate_api_keys_to_encrypted.py.
- api_key_encryption.resolve_master_key()[source]
Load and validate the master KEK from the environment, or return
None.Resolves the 32-byte master key-encryption key that gates all per-user and pool encryption from the
API_KEY_MASTER_KEYenvironment variable. Callers treat aNoneresult as “encryption disabled” and fall back to legacy plaintext handling, so a missing or malformed key degrades gracefully instead of crashing.This reads
os.environand URL-safe base64-decodes the value, logging a warning (via the modulelogger) and returningNonewhen the variable is empty, undecodable, or not exactly 32 bytes. Called as the first step of nearly every encryption-aware code path –user_llm_config.py, thetools/secret and credential modules, and the migration script – to decide whether encryption is available.
- api_key_encryption.is_encrypted(value)[source]
Report whether a stored value is an encrypted token versus legacy plaintext.
Cheap structural check used to distinguish ciphertext written by
encrypt()from older plaintext values still living in storage, letting callers decrypt only what needs decrypting and migrate plaintext in place.This performs no I/O and only tests for the
ENCRYPTED_PREFIXtag at the start of the string. Called throughout the secret-reading paths –user_llm_config.pyand thetools/modulesmanage_secrets,manage_api_keys,sftp_tools,totp_tools, and_credential_profile_store– plus the migration script.
- api_key_encryption.api_key_hash(api_key)[source]
Compute a deterministic SHA-256 lookup hash for a pooled API key.
Produces a stable identifier for an API key so the shared pool can be indexed and deduplicated without ever using the plaintext key itself as a lookup field. Being deterministic, the same key always maps to the same hash, which is what makes pool membership checks possible.
This performs no I/O: it returns the hex SHA-256 digest of the UTF-8-encoded key. Called by the pool-management paths in
tools/manage_api_keys.pyand the migration scriptscripts/migrate_api_keys_to_encrypted.pyto build the pool’s hash-keyed index.