Common utility functions for python code that interacts with Ethereum
—
Cryptographic utilities centered around Keccak-256 hashing with flexible input handling. Essential for Ethereum hash computations including transaction hashes, block hashes, and Merkle tree operations.
Compute Keccak-256 hash with flexible input formats.
def keccak(primitive=None, hexstr=None, text=None) -> bytes:
"""
Compute Keccak-256 hash of input data.
Args:
primitive: Bytes, integer, or other primitive value
hexstr (str): Hex string input (with or without 0x prefix)
text (str): UTF-8 text string input
Returns:
bytes: 32-byte Keccak-256 hash
Raises:
ValidationError: If no input provided or invalid format
TypeError: If multiple inputs provided
"""from eth_utils import keccak, encode_hex
# Hash text string
text_hash = keccak(text="Hello, Ethereum!")
print(encode_hex(text_hash))
# 0x7a7b5f8d8e4f6c8a2d3e4f5a6b7c8d9e0f123456789abcdef0123456789abcdef
# Hash bytes directly
data = b"Hello, Ethereum!"
bytes_hash = keccak(primitive=data)
print(encode_hex(bytes_hash)) # Same as above
# Hash hex string
hex_data = "0x48656c6c6f2c20457468657265756d21" # "Hello, Ethereum!" in hex
hex_hash = keccak(hexstr=hex_data)
print(encode_hex(hex_hash)) # Same as above
# Hash integer
number = 12345
int_hash = keccak(primitive=number)
print(encode_hex(int_hash))from eth_utils import keccak, encode_hex, to_bytes
def generate_address_from_pubkey(public_key_hex):
"""Generate Ethereum address from public key."""
# Remove 0x04 prefix if present (uncompressed public key indicator)
if public_key_hex.startswith('0x04'):
public_key_hex = public_key_hex[4:]
elif public_key_hex.startswith('04'):
public_key_hex = public_key_hex[2:]
# Keccak-256 hash of public key
pubkey_hash = keccak(hexstr=public_key_hex)
# Take last 20 bytes as address
address = pubkey_hash[-20:]
return encode_hex(address)
# Example (mock public key)
pubkey = "0x04" + "a" * 128 # 64-byte public key
address = generate_address_from_pubkey(pubkey)
print(f"Generated address: {address}")from eth_utils import keccak, encode_hex, to_bytes
def calculate_transaction_hash(transaction_data):
"""Calculate transaction hash for Ethereum transaction."""
# In real implementation, this would be RLP-encoded transaction data
# This is a simplified example
# Concatenate transaction fields (simplified)
nonce = to_bytes(primitive=transaction_data['nonce'])
gas_price = to_bytes(primitive=transaction_data['gasPrice'])
gas_limit = to_bytes(primitive=transaction_data['gasLimit'])
to_address = bytes.fromhex(transaction_data['to'][2:]) # Remove 0x
value = to_bytes(primitive=transaction_data['value'])
data = bytes.fromhex(transaction_data['data'][2:]) # Remove 0x
# Combine all fields (in real RLP encoding, this would be structured)
combined = nonce + gas_price + gas_limit + to_address + value + data
# Hash the combined data
tx_hash = keccak(primitive=combined)
return encode_hex(tx_hash)
# Example transaction
tx = {
'nonce': 0,
'gasPrice': 20000000000, # 20 gwei
'gasLimit': 21000,
'to': '0x742d35cc6634c0532925a3b8c17b1e8b4e1d1123',
'value': 1000000000000000000, # 1 ether in wei
'data': '0x'
}
tx_hash = calculate_transaction_hash(tx)
print(f"Transaction hash: {tx_hash}")from eth_utils import keccak, encode_hex
def calculate_event_topic(event_signature):
"""Calculate topic hash for smart contract event."""
topic_hash = keccak(text=event_signature)
return encode_hex(topic_hash)
# ERC-20 event signatures
transfer_topic = calculate_event_topic("Transfer(address,address,uint256)")
approval_topic = calculate_event_topic("Approval(address,address,uint256)")
print(f"Transfer topic: {transfer_topic}")
print(f"Approval topic: {approval_topic}")
# Use in event filtering
event_filter = {
"topics": [
[transfer_topic, approval_topic] # Either Transfer or Approval
]
}from eth_utils import keccak, encode_hex
def hash_pair(left_hash, right_hash):
"""Hash a pair of hashes for Merkle tree construction."""
# Ensure consistent ordering (left < right)
if left_hash > right_hash:
left_hash, right_hash = right_hash, left_hash
# Concatenate and hash
combined = left_hash + right_hash
return keccak(primitive=combined)
def build_merkle_root(leaf_hashes):
"""Build Merkle root from leaf hashes."""
if not leaf_hashes:
return b'\x00' * 32
if len(leaf_hashes) == 1:
return leaf_hashes[0]
# Build tree level by level
current_level = leaf_hashes[:]
while len(current_level) > 1:
next_level = []
# Process pairs
for i in range(0, len(current_level), 2):
left = current_level[i]
right = current_level[i + 1] if i + 1 < len(current_level) else left
parent_hash = hash_pair(left, right)
next_level.append(parent_hash)
current_level = next_level
return current_level[0]
# Example: Build Merkle tree from transaction hashes
tx_data = ["tx1", "tx2", "tx3", "tx4"]
leaf_hashes = [keccak(text=tx) for tx in tx_data]
merkle_root = build_merkle_root(leaf_hashes)
print(f"Merkle root: {encode_hex(merkle_root)}")from eth_utils import keccak, encode_hex, to_bytes
def create_contract_address(sender_address, nonce):
"""Calculate contract address using CREATE opcode rules."""
# Convert address to bytes (remove 0x prefix)
sender_bytes = bytes.fromhex(sender_address[2:])
# RLP encode sender + nonce (simplified)
if nonce == 0:
# RLP encoding of [address, 0]
rlp_data = sender_bytes + b'\x80' # 0x80 is RLP encoding of 0
else:
nonce_bytes = to_bytes(primitive=nonce)
# Simplified RLP encoding
rlp_data = sender_bytes + nonce_bytes
# Hash and take last 20 bytes
hash_result = keccak(primitive=rlp_data)
contract_address = hash_result[-20:]
return encode_hex(contract_address)
def create2_contract_address(sender_address, salt, bytecode_hash):
"""Calculate contract address using CREATE2 opcode rules."""
# CREATE2 address = keccak256(0xff + sender + salt + keccak256(bytecode))[12:]
prefix = b'\xff'
sender_bytes = bytes.fromhex(sender_address[2:])
salt_bytes = bytes.fromhex(salt[2:]) if isinstance(salt, str) else to_bytes(primitive=salt)
bytecode_hash_bytes = bytes.fromhex(bytecode_hash[2:]) if isinstance(bytecode_hash, str) else bytecode_hash
# Combine all components
combined = prefix + sender_bytes + salt_bytes + bytecode_hash_bytes
# Hash and take last 20 bytes
hash_result = keccak(primitive=combined)
contract_address = hash_result[-20:]
return encode_hex(contract_address)
# Examples
sender = "0x742d35cc6634c0532925a3b8c17b1e8b4e1d1123"
# CREATE address
create_addr = create_contract_address(sender, 0)
print(f"CREATE address: {create_addr}")
# CREATE2 address
salt = "0x1234567890abcdef1234567890abcdef1234567890abcdef1234567890abcdef"
bytecode_hash = keccak(text="contract bytecode")
create2_addr = create2_contract_address(sender, salt, bytecode_hash)
print(f"CREATE2 address: {create2_addr}")from eth_utils import keccak, encode_hex
def create_data_hash(data):
"""Create hash for data integrity verification."""
if isinstance(data, str):
return keccak(text=data)
elif isinstance(data, bytes):
return keccak(primitive=data)
else:
# Convert to string representation
return keccak(text=str(data))
def verify_data_integrity(data, expected_hash):
"""Verify data integrity against expected hash."""
computed_hash = create_data_hash(data)
expected_bytes = bytes.fromhex(expected_hash[2:]) if expected_hash.startswith('0x') else bytes.fromhex(expected_hash)
return computed_hash == expected_bytes
# Example usage
original_data = "Important blockchain data"
data_hash = create_data_hash(original_data)
hash_hex = encode_hex(data_hash)
print(f"Data hash: {hash_hex}")
# Later, verify integrity
is_valid = verify_data_integrity(original_data, hash_hex)
print(f"Data integrity valid: {is_valid}") # True
# Check tampered data
tampered_data = "Important blockchain data!" # Added exclamation
is_valid_tampered = verify_data_integrity(tampered_data, hash_hex)
print(f"Tampered data valid: {is_valid_tampered}") # Falsefrom eth_utils import keccak, ValidationError
def secure_hash(data_input):
"""Securely hash input with validation."""
try:
if isinstance(data_input, str):
# Ensure text is properly encoded
return keccak(text=data_input)
elif isinstance(data_input, bytes):
return keccak(primitive=data_input)
else:
raise TypeError(f"Unsupported input type: {type(data_input)}")
except ValidationError as e:
raise ValueError(f"Invalid input for hashing: {e}")
# Safe usage
try:
result = secure_hash("valid input")
print(f"Hash: {encode_hex(result)}")
except (TypeError, ValueError) as e:
print(f"Error: {e}")from eth_utils import keccak
def hash_equal(data1, data2):
"""Compare two pieces of data by their hashes."""
hash1 = keccak(text=data1) if isinstance(data1, str) else keccak(primitive=data1)
hash2 = keccak(text=data2) if isinstance(data2, str) else keccak(primitive=data2)
return hash1 == hash2
# Constant-time comparison for security
def secure_hash_compare(hash1, hash2):
"""Securely compare two hashes in constant time."""
if len(hash1) != len(hash2):
return False
result = 0
for a, b in zip(hash1, hash2):
result |= a ^ b
return result == 0