Source code for tools.evm_decompiler

"""
EVM Bytecode Decompiler Tool

Decompiles Ethereum (EVM) bytecode into readable Solidity-like code using Heimdall-rs.
Supports raw bytecode, contract addresses (with RPC), and ENS names.
"""

import asyncio
import json
import logging
import re
import subprocess
from typing import Optional

logger = logging.getLogger(__name__)

MAX_OUTPUT_LENGTH = 8000
DEFAULT_TIMEOUT = 180
DEFAULT_HEIMDALL_TIMEOUT = 120000

TOOL_NAME = "decompile_evm_bytecode"
TOOL_DESCRIPTION = (
    "Decompile EVM bytecode into readable Solidity-like code using Heimdall. "
    "Supports raw bytecode (0x...), contract addresses (with rpc_url), and ENS names. "
    "Useful for reverse engineering smart contracts, security analysis, "
    "and understanding contract behavior without source code."
)
TOOL_PARAMETERS = {
    "type": "object",
    "properties": {
        "target": {
            "type": "string",
            "description": (
                "The bytecode, contract address, or ENS name to decompile. "
                "Raw bytecode: hex starting with 0x. "
                "Contract address: 0x + 40 hex chars. "
                "ENS name: e.g. 'uniswap.eth'."
            ),
        },
        "rpc_url": {
            "type": "string",
            "description": (
                "RPC URL for fetching bytecode from blockchain. Required for "
                "contract addresses and ENS names. E.g. 'https://eth.llamarpc.com'."
            ),
        },
        "include_solidity": {
            "type": "boolean",
            "description": "Include Solidity-style decompiled output (default: true).",
        },
        "include_yul": {
            "type": "boolean",
            "description": "Include Yul intermediate representation (default: false).",
        },
        "skip_resolving": {
            "type": "boolean",
            "description": "Skip function selector resolution for faster results (default: false).",
        },
        "timeout": {
            "type": "integer",
            "description": "Timeout in ms for symbolic execution per function (default: 120000).",
        },
    },
    "required": ["target"],
}


def _is_contract_address(target: str) -> bool:
    """Internal helper: is contract address.

        Args:
            target (str): The target value.

        Returns:
            bool: True on success, False otherwise.
        """
    return bool(re.match(r'^0x[a-fA-F0-9]{40}$', target))


def _is_ens_name(target: str) -> bool:
    """Internal helper: is ens name.

        Args:
            target (str): The target value.

        Returns:
            bool: True on success, False otherwise.
        """
    return target.endswith('.eth') and not target.startswith('0x')


def _is_bytecode(target: str) -> bool:
    """Internal helper: is bytecode.

        Args:
            target (str): The target value.

        Returns:
            bool: True on success, False otherwise.
        """
    if not target.startswith('0x'):
        return False
    if len(target) <= 42:
        return False
    try:
        int(target, 16)
        return True
    except ValueError:
        return False


def _validate_target(target: str, rpc_url: Optional[str]) -> Optional[str]:
    """Internal helper: validate target.

        Args:
            target (str): The target value.
            rpc_url (Optional[str]): The rpc url value.

        Returns:
            Optional[str]: The result.
        """
    if not target:
        return "Target is required. Provide bytecode, contract address, or ENS name."

    target = target.strip()

    if _is_contract_address(target) or _is_ens_name(target):
        if not rpc_url:
            return (
                f"RPC URL is required to fetch bytecode for {'address' if _is_contract_address(target) else 'ENS name'} '{target}'. "
                "Provide an rpc_url parameter (e.g., 'https://eth.llamarpc.com' for Ethereum mainnet)."
            )
    elif target.startswith('0x'):
        if not _is_bytecode(target):
            return "Invalid bytecode format. Must be a valid hex string starting with '0x'."
    else:
        return (
            f"Unrecognized target format: '{target[:50]}...'. "
            "Provide raw bytecode (0x...), a contract address (0x + 40 hex chars), or an ENS name (*.eth)."
        )

    return None


[docs] async def run( target: str, rpc_url: Optional[str] = None, include_solidity: bool = True, include_yul: bool = False, skip_resolving: bool = False, timeout: int = DEFAULT_HEIMDALL_TIMEOUT, ) -> str: """Execute this tool and return the result. Args: target (str): The target value. rpc_url (Optional[str]): The rpc url value. include_solidity (bool): The include solidity value. include_yul (bool): The include yul value. skip_resolving (bool): The skip resolving value. timeout (int): Maximum wait time in seconds. Returns: str: Result string. """ try: target = target.strip() validation_error = _validate_target(target, rpc_url) if validation_error: return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": validation_error }) cmd = [ "heimdall", "decompile", target, "--output", "print", "--color", "never", "--quiet", "--timeout", str(timeout) ] if rpc_url: cmd.extend(["--rpc-url", rpc_url]) if include_solidity: cmd.append("--include-sol") if include_yul: cmd.append("--include-yul") if skip_resolving: cmd.append("--skip-resolving") logger.info(f"Running Heimdall decompile for target: {target[:50]}...") process = await asyncio.create_subprocess_exec( *cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=DEFAULT_TIMEOUT ) except asyncio.TimeoutError: process.kill() await process.wait() return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": f"Decompilation timed out after {DEFAULT_TIMEOUT} seconds. " "The contract may be too complex. Try with skip_resolving=True " "or increase the timeout parameter." }) stdout_text = stdout.decode(errors='replace') stderr_text = stderr.decode(errors='replace') if process.returncode != 0: error_msg = stderr_text.strip() or stdout_text.strip() or "Unknown error" error_msg = error_msg[:500] if len(error_msg) > 500 else error_msg return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": f"Heimdall decompilation failed: {error_msg}" }) decompiled_code = stdout_text.strip() if not decompiled_code: return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": "Decompilation produced no output. The bytecode may be invalid or empty." }) warning = None if len(decompiled_code) > MAX_OUTPUT_LENGTH: warning = f"Output truncated from {len(decompiled_code)} to {MAX_OUTPUT_LENGTH} characters." decompiled_code = decompiled_code[:MAX_OUTPUT_LENGTH] + "\n\n// ... [OUTPUT TRUNCATED] ..." function_count = len(re.findall(r'function\s+\w+\s*\(', decompiled_code)) response = { "status": "success", "target": target[:100] if len(target) > 100 else target, "decompiled_code": decompiled_code, "functions_found": function_count } if warning: response["warning"] = warning if stderr_text.strip() and process.returncode == 0: if warning: response["warning"] += f" Additional warnings: {stderr_text[:200]}" else: response["warning"] = stderr_text[:200] logger.info(f"Successfully decompiled target with {function_count} functions detected") return json.dumps(response) except FileNotFoundError: return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": "Heimdall is not installed or not found in PATH. " "Please install Heimdall-rs: https://github.com/Jon-Becker/heimdall-rs" }) except Exception as e: error_msg = str(e) logger.error(f"Error decompiling EVM bytecode: {error_msg}", exc_info=True) return json.dumps({ "status": "error", "target": target[:100] if len(target) > 100 else target, "error": f"Unexpected error during decompilation: {error_msg}" })