Source code for wallet_manager

"""
Ethereum Wallet Manager Module

Handles HD wallet creation, derivation, and encrypted storage in Redis.
Supports any EVM-compatible network through configurable RPC endpoints.
Uses BIP39 mnemonics and BIP44 derivation paths (m/44'/60'/0'/0/x).
"""

import os
import jsonutil as json
import base64
import hashlib
import logging
from typing import Optional, Dict, Any, Tuple, List
from datetime import datetime

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
from wallet_key_utils import ensure_master_key

logger = logging.getLogger(__name__)

WALLET_KEY_PREFIX = "eth_wallet:"
WALLET_INDEX_PREFIX = "eth_wallet_index:"
WALLET_MASTER_KEY_ENV = "ETH_WALLET_MASTER_KEY"
WALLET_MASTER_REDIS_KEY = "wallet_master_key:eth"


[docs] class WalletManager: """ Manages Ethereum HD wallets with encrypted storage in Redis. Features: - BIP39 mnemonic generation and import - BIP44 derivation (m/44'/60'/0'/0/x) - AES-256-GCM encryption for private keys at rest - Per-user wallet isolation - Address caching for derived addresses Unlike the v2 singleton, this class accepts an async Redis client via its async methods so it can work with the v3 ToolContext.redis. """
[docs] def __init__(self): """Create a wallet manager with no master key loaded yet. Leaves ``_master_key`` as ``None``; it is fetched lazily by :meth:`_ensure_master_key` on the first operation that needs to encrypt or decrypt, so construction touches no Redis or environment. The module instantiates a process-wide singleton ``wallet_manager`` at import time (imported by ``tools/eth_wallet_tools`` and the test suite), so this constructor is effectively called once per process. """ self._master_key: Optional[bytes] = None
async def _get_user_aliases(self, redis_client: Any, user_id: str, config: Any = None) -> list[str]: """Resolve every ``platform:user_id`` alias for a raw or prefixed user id. Wallet keys are stored per platform identity, so a user known on Discord must still see the same wallets when they arrive over Matrix or webchat. This expands a single id into all of that person's linked identities by delegating to :meth:`IdentityRegistry.resolve_ingress_identity`. If *user_id* already carries a recognised ``platform:`` prefix it resolves that directly; otherwise it probes each configured platform's ``stargazer:identity:alias:{platform}:{user_id}`` Redis key to find which platform owns the bare id, then resolves from there. Returns ``[user_id]`` unchanged when there is no Redis client or no identity mapping exists. Reads Redis (``GET`` on alias keys) and the identity registry. Called by nearly every read/mutate method on this class (:meth:`wallet_exists`, :meth:`get_wallet`, :meth:`get_decrypted_seed`, :meth:`derive_address`, :meth:`get_private_key`, :meth:`list_wallets`, :meth:`delete_wallet`) to fan an operation across all of a user's aliases. Args: redis_client: The async Redis client (``None`` short-circuits to ``[user_id]``). user_id: A raw user id or a ``platform:user_id`` string. config: Optional config supplying ``configured_platforms``; defaults to discord/matrix/webchat when absent. Returns: list[str]: The resolved alias ids (each ``platform:user_id``), or ``[user_id]`` when nothing could be resolved. """ if not redis_client: return [user_id] platforms = ["discord", "matrix", "webchat"] if config and hasattr(config, "configured_platforms"): platforms = config.configured_platforms platform = None actual_uid = user_id if ":" in user_id: parts = user_id.split(":", 1) if parts[0].lower() in platforms: platform = parts[0].lower() actual_uid = parts[1] from services.identity_registry import IdentityRegistry if platform: _, aliases = await IdentityRegistry.resolve_ingress_identity(platform, actual_uid, redis_client) return aliases for plat in platforms: sg_uuid_raw = await redis_client.get(f"stargazer:identity:alias:{plat}:{user_id}") if sg_uuid_raw: _, aliases = await IdentityRegistry.resolve_ingress_identity(plat, user_id, redis_client) return aliases return [user_id] async def _ensure_master_key(self, redis_client) -> None: """Lazily load (and cache) the wallet master key, once per process. Delegates to :func:`wallet_key_utils.ensure_master_key`, which returns the already-cached :attr:`_master_key` if set, otherwise reads the 32-byte AES key exclusively from the ``ETH_WALLET_MASTER_KEY`` environment variable (the ``redis_client`` and ``wallet_master_key:eth`` Redis key are passed through but ignored, as Redis persistence was removed for security); a missing variable raises and disables wallet features. The resolved key is stored back on :attr:`_master_key` so subsequent calls are free. This master key feeds :meth:`_derive_wallet_key`, so it must be loaded before any encrypt/decrypt; accordingly it is called at the top of :meth:`create_wallet`, :meth:`import_private_key`, :meth:`get_decrypted_seed`, :meth:`derive_address`, and :meth:`get_private_key`. Args: redis_client: The async Redis client (forwarded for signature compatibility but not used to load the key). """ self._master_key = await ensure_master_key( self._master_key, redis_client, WALLET_MASTER_REDIS_KEY, WALLET_MASTER_KEY_ENV, ) def _derive_wallet_key(self, user_id: str, wallet_name: str) -> bytes: """Derive the per-wallet AES key from the master key via PBKDF2. Stretches the process-wide :attr:`_master_key` into a wallet-specific 32-byte key using ``PBKDF2-HMAC-SHA256`` (100,000 iterations) with the ``"{user_id}:{wallet_name}"`` pair as the salt, so each wallet's seed is encrypted under its own key and two wallets never share key material. Pure CPU helper with no I/O; assumes :meth:`_ensure_master_key` has already populated the master key. Called by :meth:`create_wallet` and :meth:`import_private_key` (to encrypt the seed) and by :meth:`get_decrypted_seed` (to decrypt it). Args: user_id: The (platform-scoped) user id owning the wallet. wallet_name: The wallet's name. Returns: bytes: A 32-byte AES-256 key for AES-GCM encryption of the seed. """ salt = f"{user_id}:{wallet_name}".encode("utf-8") derived = hashlib.pbkdf2_hmac( "sha256", self._master_key, salt, iterations=100000, dklen=32 ) return derived def _encrypt(self, key: bytes, plaintext: str) -> str: """Encrypt a secret string with AES-256-GCM under a per-wallet key. Generates a fresh random 12-byte nonce, AEAD-encrypts the UTF-8 plaintext with :class:`AESGCM`, prepends the nonce to the ciphertext, and returns the whole blob URL-safe base64-encoded — the exact layout :meth:`_decrypt` expects. Pure crypto helper with no I/O. Called by :meth:`create_wallet` and :meth:`import_private_key` to protect the mnemonic/private key (the ``encrypted_seed``) before it is written to Redis. Args: key: The 32-byte AES key from :meth:`_derive_wallet_key`. plaintext: The secret (mnemonic or private key) to encrypt. Returns: str: URL-safe base64 of ``nonce || ciphertext``. """ aesgcm = AESGCM(key) nonce = os.urandom(12) plaintext_bytes = plaintext.encode("utf-8") ciphertext = aesgcm.encrypt(nonce, plaintext_bytes, None) combined = nonce + ciphertext return base64.urlsafe_b64encode(combined).decode("utf-8") def _decrypt(self, key: bytes, encrypted_data: str) -> str: """Decrypt an AES-256-GCM blob produced by :meth:`_encrypt`. Base64-decodes the input, splits off the leading 12-byte nonce from the ciphertext, and AEAD-decrypts with :class:`AESGCM`, returning the UTF-8 plaintext. The GCM tag is verified during decryption, so a wrong key or tampered data fails loudly rather than returning garbage. Pure crypto helper with no I/O. Called by :meth:`get_decrypted_seed`, which catches the resulting errors to skip aliases whose key does not match. Args: key: The 32-byte AES key from :meth:`_derive_wallet_key`. encrypted_data: The URL-safe base64 ``nonce || ciphertext`` blob. Returns: str: The decrypted plaintext (the mnemonic or private key). Raises: ValueError: If the decoded blob is shorter than nonce+tag (28 bytes). cryptography.exceptions.InvalidTag: If authentication fails (wrong key or corrupted ciphertext). """ combined = base64.urlsafe_b64decode(encrypted_data.encode("utf-8")) if len(combined) < 28: raise ValueError("Encrypted data too short") nonce = combined[:12] ciphertext = combined[12:] aesgcm = AESGCM(key) plaintext_bytes = aesgcm.decrypt(nonce, ciphertext, None) return plaintext_bytes.decode("utf-8")
[docs] @staticmethod def generate_mnemonic(strength: int = 128) -> str: """Generate a fresh BIP39 English mnemonic seed phrase. Uses the ``mnemonic`` library (imported lazily) to produce a new random recovery phrase whose word count follows the entropy *strength* (128 bits -> 12 words, 256 -> 24). Pure CPU helper with no I/O. Called by :meth:`create_wallet` when no mnemonic is supplied, and directly by the wallet tool layers (``tools/eth_wallet_tools``, ``tools/btc_wallet_tools``) when generating a new wallet for a user. Args: strength: Entropy in bits (e.g. ``128`` or ``256``). Returns: str: A space-separated BIP39 mnemonic phrase. """ from mnemonic import Mnemonic mnemo = Mnemonic("english") return mnemo.generate(strength=strength)
[docs] @staticmethod def validate_mnemonic(mnemonic: str) -> bool: """Check that a string is a valid BIP39 English mnemonic. Verifies both the wordlist membership and the embedded checksum via the ``mnemonic`` library (imported lazily). Pure CPU helper with no I/O. Called by :meth:`create_wallet` to reject a bad imported phrase, and by the tool layers (``tools/eth_wallet_tools``, ``tools/btc_wallet_tools``) and the identity-registry tests to validate user- or seed-supplied phrases. Args: mnemonic: The candidate mnemonic phrase. Returns: bool: ``True`` if the phrase is a valid BIP39 mnemonic, else ``False``. """ from mnemonic import Mnemonic mnemo = Mnemonic("english") return mnemo.check(mnemonic)
[docs] @staticmethod def derive_address_from_mnemonic(mnemonic: str, index: int = 0) -> Tuple[str, str]: """Derive the EVM address and private key at one BIP44 index. Walks the BIP44 Ethereum derivation path ``m/44'/60'/0'/0/{index}`` from the given mnemonic using ``eth_account`` (enabling its unaudited HD wallet feature first), returning the checksummed address and its hex private key. Pure CPU helper with no I/O. Called by :meth:`create_wallet` (index 0), :meth:`derive_address` (caching newly derived addresses), and :meth:`get_private_key` (to recover the key at a given index). Args: mnemonic: A BIP39 seed phrase. index: The address index along the standard external chain. Returns: Tuple[str, str]: ``(checksummed_address, hex_private_key)``. """ from eth_account import Account Account.enable_unaudited_hdwallet_features() path = f"m/44'/60'/0'/0/{index}" account = Account.from_mnemonic(mnemonic, account_path=path) return account.address, account.key.hex()
[docs] @staticmethod def derive_address_from_private_key(private_key: str) -> str: """Compute the EVM address for a raw private key. Normalises the key to ``0x``-prefixed form and loads it through ``eth_account.Account.from_key`` to obtain the checksummed address. Pure CPU helper with no I/O. Called by :meth:`import_private_key` when registering a simple (non-HD) wallet. Args: private_key: A hex private key, with or without the ``0x`` prefix. Returns: str: The checksummed EVM address for that key. """ from eth_account import Account if not private_key.startswith("0x"): private_key = "0x" + private_key account = Account.from_key(private_key) return account.address
[docs] @staticmethod def is_valid_private_key(private_key: str) -> bool: """Report whether a string is a usable EVM private key. Attempts to load the (``0x``-normalised) key via ``eth_account.Account.from_key`` and returns ``False`` on any exception, making it a safe pre-flight check that never raises. Pure CPU helper with no I/O. Called by :meth:`import_private_key` to reject malformed input before storing anything. Args: private_key: A candidate hex private key, with or without ``0x``. Returns: bool: ``True`` if the key parses, else ``False``. """ try: from eth_account import Account if not private_key.startswith("0x"): private_key = "0x" + private_key Account.from_key(private_key) return True except Exception: return False
[docs] async def create_wallet( self, user_id: str, wallet_name: str, redis_client, mnemonic: Optional[str] = None, strength: int = 128, ) -> Dict[str, Any]: """Create and persist a new HD wallet for a user. Validates and normalises *wallet_name* (1-32 alphanumeric chars plus ``-``/``_``), ensures it does not already exist across the user's aliases, then either validates the supplied *mnemonic* or generates a fresh one via :meth:`generate_mnemonic`. It derives the index-0 address with :meth:`derive_address_from_mnemonic`, encrypts the mnemonic under a per-wallet key (:meth:`_derive_wallet_key` then :meth:`_encrypt`), and writes the wallet record to the ``eth_wallet:{user_id}:{wallet_name}`` Redis key while adding the name to the ``eth_wallet_index:{user_id}`` set. Touches the master key (:meth:`_ensure_master_key`) and Redis (``SET`` plus ``SADD``); logs the creation. Invoked by ``tools/eth_wallet_tools`` (the ``create``/``import`` flows) and the identity-registry tests. Args: user_id: The user the wallet belongs to. wallet_name: Desired wallet name (case-folded, validated). redis_client: The async Redis client for persistence. mnemonic: An existing BIP39 phrase to import, or ``None`` to generate one. strength: Entropy in bits when generating a new mnemonic. Returns: Dict[str, Any]: ``wallet_name``, ``type`` (``"hd"``), ``address``, and ``created_at`` for the new wallet (the seed is never returned). Raises: ValueError: If the name is invalid, the wallet already exists, or a supplied mnemonic fails validation. """ wallet_name = wallet_name.strip().lower() if not wallet_name or len(wallet_name) > 32: raise ValueError("Wallet name must be 1-32 characters") if not wallet_name.replace("_", "").replace("-", "").isalnum(): raise ValueError("Wallet name must be alphanumeric (with - or _)") await self._ensure_master_key(redis_client) if await self.wallet_exists(user_id, wallet_name, redis_client): raise ValueError(f"Wallet '{wallet_name}' already exists") if mnemonic: if not self.validate_mnemonic(mnemonic): raise ValueError("Invalid mnemonic phrase") else: mnemonic = self.generate_mnemonic(strength=strength) address, _ = self.derive_address_from_mnemonic(mnemonic, index=0) encryption_key = self._derive_wallet_key(user_id, wallet_name) encrypted_seed = self._encrypt(encryption_key, mnemonic) wallet_data = { "type": "hd", "encrypted_seed": encrypted_seed, "addresses": {"0": address}, "created_at": datetime.utcnow().isoformat(), } wallet_key = f"{WALLET_KEY_PREFIX}{user_id}:{wallet_name}" await redis_client.set(wallet_key, json.dumps(wallet_data)) index_key = f"{WALLET_INDEX_PREFIX}{user_id}" await redis_client.sadd(index_key, wallet_name) logger.info(f"Created HD wallet '{wallet_name}' for user {user_id}") return { "wallet_name": wallet_name, "type": "hd", "address": address, "created_at": wallet_data["created_at"], }
[docs] async def import_private_key( self, user_id: str, wallet_name: str, private_key: str, redis_client, ) -> Dict[str, Any]: """Import a raw private key as a simple (non-HD) wallet. Mirrors :meth:`create_wallet` but for a single externally-supplied key: it validates the wallet name, ensures the wallet does not already exist, normalises and validates the key (:meth:`is_valid_private_key`), derives its address (:meth:`derive_address_from_private_key`), encrypts the key under a per-wallet key (:meth:`_derive_wallet_key` then :meth:`_encrypt`), and stores a ``type: simple`` record at ``eth_wallet:{user_id}:{wallet_name}`` while adding the name to the ``eth_wallet_index:{user_id}`` set. Touches the master key (:meth:`_ensure_master_key`) and Redis (``SET`` plus ``SADD``); logs the import. Called by ``tools/eth_wallet_tools``. Args: user_id: The user the wallet belongs to. wallet_name: Desired wallet name (case-folded, validated). private_key: The hex private key to import (``0x`` optional). redis_client: The async Redis client for persistence. Returns: Dict[str, Any]: ``wallet_name``, ``type`` (``"simple"``), ``address``, and ``created_at`` (the key itself is never returned). Raises: ValueError: If the name is invalid, the wallet already exists, or the private key is invalid. """ wallet_name = wallet_name.strip().lower() if not wallet_name or len(wallet_name) > 32: raise ValueError("Wallet name must be 1-32 characters") if not wallet_name.replace("_", "").replace("-", "").isalnum(): raise ValueError("Wallet name must be alphanumeric (with - or _)") await self._ensure_master_key(redis_client) if await self.wallet_exists(user_id, wallet_name, redis_client): raise ValueError(f"Wallet '{wallet_name}' already exists") if not private_key.startswith("0x"): private_key = "0x" + private_key if not self.is_valid_private_key(private_key): raise ValueError("Invalid private key") address = self.derive_address_from_private_key(private_key) encryption_key = self._derive_wallet_key(user_id, wallet_name) encrypted_seed = self._encrypt(encryption_key, private_key) wallet_data = { "type": "simple", "encrypted_seed": encrypted_seed, "addresses": {"0": address}, "created_at": datetime.utcnow().isoformat(), } wallet_key = f"{WALLET_KEY_PREFIX}{user_id}:{wallet_name}" await redis_client.set(wallet_key, json.dumps(wallet_data)) index_key = f"{WALLET_INDEX_PREFIX}{user_id}" await redis_client.sadd(index_key, wallet_name) logger.info(f"Imported simple wallet '{wallet_name}' for user {user_id}") return { "wallet_name": wallet_name, "type": "simple", "address": address, "created_at": wallet_data["created_at"], }
[docs] async def wallet_exists(self, user_id: str, wallet_name: str, redis_client, config: Any = None) -> bool: """Report whether a named wallet exists for the user (any alias). Expands the user into all linked identities via :meth:`_get_user_aliases` and checks each alias's ``eth_wallet:{alias}:{wallet_name}`` Redis key (``EXISTS``), returning ``True`` on the first hit so a wallet created on one platform is visible across them all. Reads Redis only. Called by :meth:`create_wallet` and :meth:`import_private_key` as a uniqueness guard, and directly by the identity-registry tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name to look up (case-folded). redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: bool: ``True`` if the wallet exists under any alias, else ``False``. """ aliases = await self._get_user_aliases(redis_client, user_id, config) for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet_key = f"{WALLET_KEY_PREFIX}{plat_uid}:{wallet_name.strip().lower()}" if await redis_client.exists(wallet_key) > 0: return True return False
[docs] async def get_wallet( self, user_id: str, wallet_name: str, redis_client, config: Any = None ) -> Optional[Dict[str, Any]]: """Fetch a wallet's public metadata (never its seed) for the user. Resolves the user's aliases via :meth:`_get_user_aliases` and reads each ``eth_wallet:{alias}:{wallet_name}`` Redis key (``GET``) until one hits, then returns the non-secret fields — name, type, derived addresses, and creation time — deliberately omitting the ``encrypted_seed``. Reads Redis only. Called by :meth:`list_wallets` and :meth:`get_private_key` internally, and directly by the wallet tool layers (``tools/eth_wallet_tools``, ``tools/btc_wallet_tools``) and tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name to fetch (case-folded). redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: Optional[Dict[str, Any]]: ``wallet_name``, ``type``, ``addresses``, and ``created_at``, or ``None`` if no matching wallet exists. """ aliases = await self._get_user_aliases(redis_client, user_id, config) for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet_key = f"{WALLET_KEY_PREFIX}{plat_uid}:{wallet_name.strip().lower()}" data = await redis_client.get(wallet_key) if data: wallet_data = json.loads(data) return { "wallet_name": wallet_name, "type": wallet_data.get("type", "hd"), "addresses": wallet_data.get("addresses", {}), "created_at": wallet_data.get("created_at"), } return None
[docs] async def get_decrypted_seed( self, user_id: str, wallet_name: str, redis_client, config: Any = None ) -> Optional[str]: """Decrypt and return a wallet's seed (mnemonic or private key). Loads the master key (:meth:`_ensure_master_key`), then for each of the user's aliases (:meth:`_get_user_aliases`) reads the ``eth_wallet:{alias}:{wallet_name}`` record from Redis, re-derives that wallet's key with :meth:`_derive_wallet_key`, and decrypts the stored ``encrypted_seed`` via :meth:`_decrypt`. Because the per-wallet key is salted with the owning alias, it retries the next alias on an :class:`InvalidTag` or :class:`ValueError` (logging the failure) so the right identity's key eventually matches. Returns the raw secret, so callers must treat it carefully. Reads Redis and performs decryption. Called internally by :meth:`derive_address` and :meth:`get_private_key`, and directly by ``tools/eth_wallet_tools``, ``tools/btc_wallet_tools``, and the tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name (case-folded). redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: Optional[str]: The decrypted mnemonic or private key, or ``None`` if the wallet is missing, has no seed, or cannot be decrypted under any alias. """ wallet_name = wallet_name.strip().lower() await self._ensure_master_key(redis_client) aliases = await self._get_user_aliases(redis_client, user_id, config) for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet_key = f"{WALLET_KEY_PREFIX}{plat_uid}:{wallet_name}" data = await redis_client.get(wallet_key) if data: wallet_data = json.loads(data) encrypted_seed = wallet_data.get("encrypted_seed") if not encrypted_seed: return None try: encryption_key = self._derive_wallet_key(plat_uid, wallet_name) return self._decrypt(encryption_key, encrypted_seed) except (InvalidTag, ValueError) as e: logger.error(f"Failed to decrypt wallet seed for alias {plat_uid}: {e}") continue return None
[docs] async def derive_address( self, user_id: str, wallet_name: str, index: int, redis_client, config: Any = None, ) -> Optional[str]: """Return the EVM address at a given index, deriving and caching it. Loads the master key, resolves aliases, and finds the wallet record in Redis. If the requested *index* is already in the cached ``addresses`` map it returns it directly. For a simple (non-HD) wallet only index 0 exists, so any other index raises. For an HD wallet it fetches the decrypted seed (:meth:`get_decrypted_seed`), derives the address with :meth:`derive_address_from_mnemonic`, writes the newly derived address back into the wallet record (a Redis ``SET``) so future lookups are free, and returns it. Reads and writes Redis and performs decryption. Called by ``tools/eth_wallet_tools`` for address display, sends, and balance checks, and by the identity-registry tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name (case-folded). index: The BIP44 address index to resolve. redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: Optional[str]: The address at *index*, or ``None`` if the wallet or its seed cannot be found. Raises: ValueError: If a non-zero index is requested from a simple wallet. """ wallet_name = wallet_name.strip().lower() await self._ensure_master_key(redis_client) aliases = await self._get_user_aliases(redis_client, user_id, config) for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet_key = f"{WALLET_KEY_PREFIX}{plat_uid}:{wallet_name}" data = await redis_client.get(wallet_key) if data: wallet_data = json.loads(data) addresses = wallet_data.get("addresses", {}) str_index = str(index) if str_index in addresses: return addresses[str_index] if wallet_data.get("type") != "hd": if index == 0: return addresses.get("0") raise ValueError( "Cannot derive additional addresses from a simple (non-HD) wallet" ) seed = await self.get_decrypted_seed(plat_uid, wallet_name, redis_client, config) if not seed: return None address, _ = self.derive_address_from_mnemonic(seed, index=index) addresses[str_index] = address wallet_data["addresses"] = addresses await redis_client.set(wallet_key, json.dumps(wallet_data)) return address return None
[docs] async def get_private_key( self, user_id: str, wallet_name: str, redis_client, index: int = 0, config: Any = None, ) -> Optional[str]: """Return the private key for a wallet address at a given index. Loads the master key and, for each alias, looks the wallet up via :meth:`get_wallet` and decrypts its seed via :meth:`get_decrypted_seed`. For a simple wallet the stored seed *is* the private key (only index 0 is valid); for an HD wallet it derives the key at *index* with :meth:`derive_address_from_mnemonic`. Reads Redis and performs decryption, and returns secret key material, so callers (e.g. transaction signing in ``tools/eth_wallet_tools``) must handle it carefully. Also exercised by the identity-registry tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name (case-folded). redis_client: The async Redis client. index: The BIP44 index for HD wallets (must be 0 for simple wallets). config: Optional config passed through to alias resolution. Returns: Optional[str]: The hex private key, or ``None`` if the wallet or its seed cannot be found. Raises: ValueError: If a non-zero index is requested from a simple wallet. """ wallet_name = wallet_name.strip().lower() await self._ensure_master_key(redis_client) aliases = await self._get_user_aliases(redis_client, user_id, config) for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet = await self.get_wallet(plat_uid, wallet_name, redis_client, config) if wallet: seed = await self.get_decrypted_seed(plat_uid, wallet_name, redis_client, config) if not seed: return None if wallet["type"] == "simple": if index != 0: raise ValueError("Simple wallets only have one address (index 0)") return seed else: _, private_key = self.derive_address_from_mnemonic(seed, index=index) return private_key return None
[docs] async def list_wallets(self, user_id: str, redis_client, config: Any = None) -> List[Dict[str, Any]]: """List all of a user's wallets (public metadata) across aliases. Resolves the user's aliases (:meth:`_get_user_aliases`) and unions the members of each ``eth_wallet_index:{alias}`` Redis set (``SMEMBERS``, decoding any ``bytes``) into a de-duplicated set of names, then loads each one's public metadata via :meth:`get_wallet` and returns them sorted by creation time. Reads Redis only; never exposes seeds. Called by ``tools/eth_wallet_tools`` and ``tools/btc_wallet_tools`` to render a user's wallet list, and by the tests. Args: user_id: The user (raw or ``platform:user_id``). redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: List[Dict[str, Any]]: Per-wallet metadata dicts (as returned by :meth:`get_wallet`), ordered by ``created_at``. """ aliases = await self._get_user_aliases(redis_client, user_id, config) all_wallet_names = set() for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias index_key = f"{WALLET_INDEX_PREFIX}{plat_uid}" wallet_names = await redis_client.smembers(index_key) for name in wallet_names: if isinstance(name, bytes): name = name.decode("utf-8") all_wallet_names.add(name) wallets = [] for name in all_wallet_names: wallet = await self.get_wallet(user_id, name, redis_client, config) if wallet: wallets.append(wallet) return sorted(wallets, key=lambda w: w.get("created_at", ""))
[docs] async def delete_wallet(self, user_id: str, wallet_name: str, redis_client, config: Any = None) -> bool: """Delete a named wallet for every alias of the user. Resolves the user's aliases (:meth:`_get_user_aliases`) and, for each, deletes the ``eth_wallet:{alias}:{wallet_name}`` key (``DEL``); when a key was actually removed it also drops the name from the ``eth_wallet_index:{alias}`` set (``SREM``) and logs the deletion. Reports success if any alias's wallet was removed. Writes to Redis. Called by ``tools/eth_wallet_tools`` and ``tools/btc_wallet_tools`` for the user-facing delete flow, and by the tests. Args: user_id: The user (raw or ``platform:user_id``). wallet_name: The wallet name to delete (case-folded). redis_client: The async Redis client. config: Optional config passed through to alias resolution. Returns: bool: ``True`` if a wallet was deleted under any alias, else ``False``. """ wallet_name = wallet_name.strip().lower() aliases = await self._get_user_aliases(redis_client, user_id, config) any_deleted = False for alias in aliases: plat_uid = alias.split(":", 1)[1] if ":" in alias else alias wallet_key = f"{WALLET_KEY_PREFIX}{plat_uid}:{wallet_name}" deleted = await redis_client.delete(wallet_key) if deleted: index_key = f"{WALLET_INDEX_PREFIX}{plat_uid}" await redis_client.srem(index_key, wallet_name) logger.info(f"Deleted wallet '{wallet_name}' for user alias {plat_uid}") any_deleted = True return any_deleted
wallet_manager = WalletManager()