"""
Bitcoin Wallet Manager Module
Handles HD wallet creation, derivation, and encrypted storage in Redis.
Supports BIP39 mnemonics and BIP84 derivation paths (m/84'/0'/0'/0/x) for Native SegWit.
"""
import os
import json
import base64
import hashlib
import logging
from typing import Optional, Dict, Any, List
from datetime import datetime
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.exceptions import InvalidTag
from btc_networks import get_btc_network, BTCNetworkConfig
from wallet_key_utils import ensure_master_key
logger = logging.getLogger(__name__)
BTC_WALLET_KEY_PREFIX = "btc_wallet:"
BTC_WALLET_INDEX_PREFIX = "btc_wallet_index:"
BTC_WALLET_MASTER_KEY_ENV = "BTC_WALLET_MASTER_KEY"
BTC_WALLET_MASTER_REDIS_KEY = "wallet_master_key:btc"
[docs]
class BTCWalletManager:
"""
Manages Bitcoin HD wallets with encrypted storage in Redis.
Features:
- BIP39 mnemonic generation and import
- BIP84 derivation (m/84'/0'/0'/0/x) for Native SegWit
- AES-256-GCM encryption for seeds at rest
- Per-user wallet isolation
- Address caching for derived addresses
Accepts an async Redis client via method parameters for v3 compatibility.
"""
[docs]
def __init__(self):
"""Initialize the instance.
"""
self._master_key: Optional[bytes] = None
async def _ensure_master_key(self, redis_client) -> None:
"""Lazily load or generate the master key, persisting to Redis."""
self._master_key = await ensure_master_key(
self._master_key,
redis_client,
BTC_WALLET_MASTER_REDIS_KEY,
BTC_WALLET_MASTER_KEY_ENV,
)
def _derive_wallet_key(self, user_id: str, wallet_name: str) -> bytes:
"""Internal helper: derive wallet key.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
Returns:
bytes: The result.
"""
salt = f"btc:{user_id}:{wallet_name}".encode('utf-8')
derived = hashlib.pbkdf2_hmac(
'sha256',
self._master_key,
salt,
iterations=100000,
dklen=32
)
return derived
def _encrypt(self, key: bytes, plaintext: str) -> str:
"""Internal helper: encrypt.
Args:
key (bytes): Dictionary or cache key.
plaintext (str): The plaintext value.
Returns:
str: Result string.
"""
aesgcm = AESGCM(key)
nonce = os.urandom(12)
ciphertext = aesgcm.encrypt(nonce, plaintext.encode('utf-8'), None)
return base64.urlsafe_b64encode(nonce + ciphertext).decode('utf-8')
def _decrypt(self, key: bytes, encrypted: str) -> Optional[str]:
"""Internal helper: decrypt.
Args:
key (bytes): Dictionary or cache key.
encrypted (str): The encrypted value.
Returns:
Optional[str]: The result.
"""
try:
aesgcm = AESGCM(key)
data = base64.urlsafe_b64decode(encrypted)
nonce = data[:12]
ciphertext = data[12:]
plaintext = aesgcm.decrypt(nonce, ciphertext, None)
return plaintext.decode('utf-8')
except (InvalidTag, Exception) as e:
logger.error(f"Decryption failed: {e}")
return None
[docs]
def generate_mnemonic(self, strength: int = 128) -> str:
"""Generate mnemonic.
Args:
strength (int): The strength value.
Returns:
str: Result string.
"""
from bitcoinlib.mnemonic import Mnemonic
m = Mnemonic()
return m.generate(strength=strength)
[docs]
def validate_mnemonic(self, mnemonic: str) -> bool:
"""Validate the mnemonic.
Args:
mnemonic (str): The mnemonic value.
Returns:
bool: True on success, False otherwise.
"""
from bitcoinlib.mnemonic import Mnemonic
try:
m = Mnemonic()
return m.check(mnemonic)
except Exception:
return False
[docs]
async def create_wallet(
self,
user_id: str,
wallet_name: str,
mnemonic: str,
redis_client,
network: str = "bitcoin"
) -> Dict[str, Any]:
"""Create a new wallet.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
mnemonic (str): The mnemonic value.
redis_client: Redis connection client.
network (str): The network value.
Returns:
Dict[str, Any]: The result.
"""
wallet_name = wallet_name.strip()
if not wallet_name or len(wallet_name) > 32:
raise ValueError("Wallet name must be 1-32 characters")
if not wallet_name.replace('_', '').replace('-', '').isalnum():
raise ValueError("Wallet name must be alphanumeric (with _ or -)")
await self._ensure_master_key(redis_client)
existing = await self.get_wallet(user_id, wallet_name, redis_client)
if existing:
raise ValueError(f"Wallet '{wallet_name}' already exists")
if not self.validate_mnemonic(mnemonic):
raise ValueError("Invalid mnemonic phrase")
net_config = get_btc_network(network)
if not net_config:
raise ValueError(f"Unknown network: {network}")
from bitcoinlib.keys import HDKey
hdkey = HDKey.from_passphrase(mnemonic, network=net_config.network_name)
coin_type = "1'" if net_config.is_testnet else "0'"
path = f"m/84'/{coin_type}/0'/0/0"
derived = hdkey.subkey_for_path(path)
address = derived.address(encoding='bech32')
encryption_key = self._derive_wallet_key(user_id, wallet_name)
encrypted_mnemonic = self._encrypt(encryption_key, mnemonic)
wallet_data = {
"wallet_name": wallet_name,
"type": "hd",
"network": net_config.network_name,
"encrypted_seed": encrypted_mnemonic,
"addresses": {"0": address},
"created_at": datetime.utcnow().isoformat()
}
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
await redis_client.set(redis_key, json.dumps(wallet_data))
index_key = f"{BTC_WALLET_INDEX_PREFIX}{user_id}"
await redis_client.sadd(index_key, wallet_name)
return {
"wallet_name": wallet_name,
"address": address,
"network": net_config.name,
"created_at": wallet_data["created_at"]
}
[docs]
async def import_wif(
self,
user_id: str,
wallet_name: str,
wif: str,
redis_client,
network: str = "bitcoin"
) -> Dict[str, Any]:
"""Import wif.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
wif (str): The wif value.
redis_client: Redis connection client.
network (str): The network value.
Returns:
Dict[str, Any]: The result.
"""
wallet_name = wallet_name.strip()
if not wallet_name or len(wallet_name) > 32:
raise ValueError("Wallet name must be 1-32 characters")
await self._ensure_master_key(redis_client)
existing = await self.get_wallet(user_id, wallet_name, redis_client)
if existing:
raise ValueError(f"Wallet '{wallet_name}' already exists")
net_config = get_btc_network(network)
if not net_config:
raise ValueError(f"Unknown network: {network}")
from bitcoinlib.keys import Key
try:
key = Key(wif, network=net_config.network_name)
address = key.address(encoding='bech32')
except Exception as e:
raise ValueError(f"Invalid WIF key: {e}")
encryption_key = self._derive_wallet_key(user_id, wallet_name)
encrypted_wif = self._encrypt(encryption_key, wif)
wallet_data = {
"wallet_name": wallet_name,
"type": "simple",
"network": net_config.network_name,
"encrypted_seed": encrypted_wif,
"addresses": {"0": address},
"created_at": datetime.utcnow().isoformat()
}
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
await redis_client.set(redis_key, json.dumps(wallet_data))
index_key = f"{BTC_WALLET_INDEX_PREFIX}{user_id}"
await redis_client.sadd(index_key, wallet_name)
return {
"wallet_name": wallet_name,
"address": address,
"network": net_config.name,
"created_at": wallet_data["created_at"]
}
[docs]
async def get_wallet(self, user_id: str, wallet_name: str, redis_client) -> Optional[Dict[str, Any]]:
"""Retrieve the wallet.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
redis_client: Redis connection client.
Returns:
Optional[Dict[str, Any]]: The result.
"""
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
data = await redis_client.get(redis_key)
if data:
wallet = json.loads(data)
wallet.pop("encrypted_seed", None)
return wallet
return None
[docs]
async def list_wallets(self, user_id: str, redis_client) -> List[Dict[str, Any]]:
"""List wallets.
Args:
user_id (str): Unique identifier for the user.
redis_client: Redis connection client.
Returns:
List[Dict[str, Any]]: The result.
"""
index_key = f"{BTC_WALLET_INDEX_PREFIX}{user_id}"
wallet_names = await redis_client.smembers(index_key)
wallets = []
for name in wallet_names:
if isinstance(name, bytes):
name = name.decode('utf-8')
wallet = await self.get_wallet(user_id, name, redis_client)
if wallet:
wallets.append(wallet)
return sorted(wallets, key=lambda w: w.get("created_at", ""))
[docs]
async def derive_address(
self,
user_id: str,
wallet_name: str,
index: int,
redis_client,
address_type: str = "bech32"
) -> Optional[str]:
"""Derive address.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
index (int): The index value.
redis_client: Redis connection client.
address_type (str): The address type value.
Returns:
Optional[str]: The result.
"""
await self._ensure_master_key(redis_client)
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
data = await redis_client.get(redis_key)
if not data:
return None
wallet = json.loads(data)
cache_key = f"{index}:{address_type}"
if cache_key in wallet.get("addresses", {}):
return wallet["addresses"][cache_key]
if wallet["type"] == "simple":
if index != 0:
raise ValueError("Simple wallets only have one address (index 0)")
return wallet["addresses"].get("0")
encryption_key = self._derive_wallet_key(user_id, wallet_name)
mnemonic = self._decrypt(encryption_key, wallet["encrypted_seed"])
if not mnemonic:
return None
from bitcoinlib.keys import HDKey
network = wallet.get("network", "bitcoin")
net_config = get_btc_network(network)
hdkey = HDKey.from_passphrase(mnemonic, network=net_config.network_name)
coin_type = "1'" if net_config.is_testnet else "0'"
path = f"m/84'/{coin_type}/0'/0/{index}"
derived = hdkey.subkey_for_path(path)
if address_type == "bech32":
address = derived.address(encoding='bech32')
elif address_type == "p2sh":
address = derived.address(script_type='p2sh')
else:
address = derived.address()
wallet["addresses"][cache_key] = address
await redis_client.set(redis_key, json.dumps(wallet))
return address
[docs]
async def get_private_key(
self,
user_id: str,
wallet_name: str,
redis_client,
index: int = 0
) -> Optional[str]:
"""Retrieve the private key.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
redis_client: Redis connection client.
index (int): The index value.
Returns:
Optional[str]: The result.
"""
await self._ensure_master_key(redis_client)
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
data = await redis_client.get(redis_key)
if not data:
return None
wallet = json.loads(data)
encryption_key = self._derive_wallet_key(user_id, wallet_name)
seed = self._decrypt(encryption_key, wallet["encrypted_seed"])
if not seed:
return None
if wallet["type"] == "simple":
return seed
from bitcoinlib.keys import HDKey
network = wallet.get("network", "bitcoin")
net_config = get_btc_network(network)
hdkey = HDKey.from_passphrase(seed, network=net_config.network_name)
coin_type = "1'" if net_config.is_testnet else "0'"
path = f"m/84'/{coin_type}/0'/0/{index}"
derived = hdkey.subkey_for_path(path)
return derived.wif()
[docs]
async def get_decrypted_seed(self, user_id: str, wallet_name: str, redis_client) -> Optional[str]:
"""Retrieve the decrypted seed.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
redis_client: Redis connection client.
Returns:
Optional[str]: The result.
"""
await self._ensure_master_key(redis_client)
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
data = await redis_client.get(redis_key)
if not data:
return None
wallet = json.loads(data)
encryption_key = self._derive_wallet_key(user_id, wallet_name)
return self._decrypt(encryption_key, wallet["encrypted_seed"])
[docs]
async def delete_wallet(self, user_id: str, wallet_name: str, redis_client) -> bool:
"""Delete the specified wallet.
Args:
user_id (str): Unique identifier for the user.
wallet_name (str): The wallet name value.
redis_client: Redis connection client.
Returns:
bool: True on success, False otherwise.
"""
redis_key = f"{BTC_WALLET_KEY_PREFIX}{user_id}:{wallet_name}"
if not await redis_client.exists(redis_key):
return False
await redis_client.delete(redis_key)
index_key = f"{BTC_WALLET_INDEX_PREFIX}{user_id}"
await redis_client.srem(index_key, wallet_name)
return True
btc_wallet_manager = BTCWalletManager()