Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Built-in support for data compression algorithms including LZ4 and ZSTD with configurable block sizes, plus optional Cython extensions for performance-critical operations. Compression significantly reduces network traffic and can improve query performance for network-bound workloads.
Multiple compression algorithms with different performance and compression ratio characteristics.
# Compression algorithms (require optional dependencies)
# LZ4: Fast compression with good performance
# LZ4HC: Higher compression ratio, slower than LZ4
# ZSTD: Excellent compression ratio with good performance
# Installation requirements
# pip install clickhouse-driver[lz4] # For LZ4 support
# pip install clickhouse-driver[zstd] # For ZSTD support
# pip install clickhouse-driver[lz4,zstd] # For both algorithmsConfigure compression at client level for all connections and queries.
# Enable compression in client constructor
client = Client(
host='localhost',
compression=True, # Enable compression (auto-detect algorithm)
compress_block_size=1048576, # Compression block size (1MB default)
compression_algorithm='lz4' # Specific algorithm: 'lz4', 'lz4hc', 'zstd'
)
# Alternative configuration styles
client = Client('localhost', compression='lz4') # Enable LZ4
client = Client('localhost', compression='zstd') # Enable ZSTD
client = Client('localhost', compression='lz4hc') # Enable LZ4HCOverride compression settings for individual queries through ClickHouse settings.
# Query-specific compression settings
result = client.execute(
'SELECT * FROM large_table',
settings={
'network_compression_method': 'zstd', # Algorithm for this query
'network_zstd_compression_level': 3, # ZSTD compression level (1-22)
'compress': 1, # Enable compression
'decompress': 1 # Enable decompression
}
)
# Available compression settings
compression_settings = {
'network_compression_method': 'lz4|lz4hc|zstd', # Algorithm choice
'network_zstd_compression_level': 1, # ZSTD level (1-22, default: 1)
'compress': 1, # Enable compression (0/1)
'decompress': 1 # Enable decompression (0/1)
}Low-level compression interfaces for advanced usage (not typically needed for normal operations).
# Base compression interfaces (internal use)
class Compressor:
"""Base compressor interface."""
def compress(self, data):
"""
Compress data block.
Parameters:
- data: bytes to compress
Returns:
- bytes: compressed data
"""
class Decompressor:
"""Base decompressor interface."""
def decompress(self, data):
"""
Decompress data block.
Parameters:
- data: compressed bytes
Returns:
- bytes: decompressed data
"""
# Algorithm-specific implementations
# LZ4Compressor, LZ4Decompressor
# LZ4HCCompressor, LZ4HCDecompressor
# ZSTDCompressor, ZSTDDecompressorOptional Cython extensions and performance tuning for high-throughput workloads.
# Cython extensions (automatically used if available)
# Built during installation for performance-critical operations:
# - bufferedreader: Fast binary data reading
# - bufferedwriter: Fast binary data writing
# - varint: Variable integer encoding/decoding
# - columns.largeint: Large integer processing
# Performance settings
client = Client(
'localhost',
compress_block_size=4194304, # Larger blocks: better compression, more memory
send_receive_timeout=300, # Longer timeout for large compressed data
sync_request_timeout=60 # Timeout for synchronous operations
)| Algorithm | Compression Speed | Decompression Speed | Compression Ratio | Use Case |
|---|---|---|---|---|
| LZ4 | Very Fast | Very Fast | Good | Real-time, low latency |
| LZ4HC | Moderate | Very Fast | Better | Balanced performance |
| ZSTD | Fast | Fast | Excellent | Best overall choice |
# Small blocks (64KB - 256KB)
# - Lower memory usage
# - Faster response times
# - Less compression efficiency
client_small_blocks = Client(
'localhost',
compression='lz4',
compress_block_size=65536 # 64KB blocks
)
# Large blocks (1MB - 4MB)
# - Better compression ratios
# - Higher memory usage
# - Potential latency increase
client_large_blocks = Client(
'localhost',
compression='zstd',
compress_block_size=4194304 # 4MB blocks
)from clickhouse_driver import Client
# Enable LZ4 compression (requires: pip install clickhouse-driver[lz4])
client = Client(
host='remote-server.example.com',
compression='lz4',
compress_block_size=1048576 # 1MB blocks
)
# Query with compression (automatically applied)
result = client.execute('SELECT * FROM large_table LIMIT 10000')
print(f"Retrieved {len(result)} rows with LZ4 compression")
client.disconnect()# Enable ZSTD for best compression ratio (requires: pip install clickhouse-driver[zstd])
client = Client(
host='slow-network-server.example.com',
compression='zstd',
compress_block_size=2097152 # 2MB blocks for better compression
)
# Large data transfer with high compression
result = client.execute('''
SELECT user_id, event_data, timestamp, metadata
FROM user_events
WHERE date >= today() - 30
''', settings={
'network_zstd_compression_level': 6 # Higher compression level
})
print(f"Retrieved {len(result)} events with ZSTD compression")import time
from clickhouse_driver import Client
def create_optimized_client(server_type='local'):
"""Create client with compression optimized for server type."""
if server_type == 'local':
# Local server: minimal compression for lowest latency
return Client(
'localhost',
compression=False # No compression overhead
)
elif server_type == 'remote_fast':
# Fast remote connection: balanced compression
return Client(
'remote-server.example.com',
compression='lz4',
compress_block_size=1048576
)
elif server_type == 'remote_slow':
# Slow/expensive connection: maximum compression
return Client(
'slow-server.example.com',
compression='zstd',
compress_block_size=4194304,
settings={
'network_zstd_compression_level': 9
}
)
# Usage based on deployment
client = create_optimized_client('remote_slow')import time
from clickhouse_driver import Client
def benchmark_compression(query, algorithms=['none', 'lz4', 'zstd']):
"""Benchmark query performance with different compression algorithms."""
results = {}
for algorithm in algorithms:
if algorithm == 'none':
client = Client('remote-server.example.com', compression=False)
else:
client = Client('remote-server.example.com', compression=algorithm)
start_time = time.time()
result = client.execute(query)
end_time = time.time()
results[algorithm] = {
'duration': end_time - start_time,
'rows': len(result),
'rows_per_second': len(result) / (end_time - start_time)
}
client.disconnect()
return results
# Benchmark large query
query = 'SELECT * FROM large_table WHERE date >= today() - 7'
benchmark_results = benchmark_compression(query)
for algorithm, metrics in benchmark_results.items():
print(f"{algorithm}: {metrics['duration']:.2f}s, "
f"{metrics['rows_per_second']:.0f} rows/sec")# Large streaming query with compression
client = Client(
'remote-server.example.com',
compression='zstd',
compress_block_size=2097152 # 2MB blocks
)
total_rows = 0
start_time = time.time()
# Stream large dataset with compression
for block in client.execute_iter('''
SELECT user_id, action, timestamp, details
FROM user_activity_log
WHERE date >= today() - 90
'''):
# Process each compressed block
for row in block:
process_user_activity(row)
total_rows += 1
if total_rows % 100000 == 0:
elapsed = time.time() - start_time
rate = total_rows / elapsed
print(f"Processed {total_rows:,} rows at {rate:.0f} rows/sec")
print(f"Total: {total_rows:,} rows processed with ZSTD compression")import random
from datetime import datetime, timedelta
# Large INSERT with compression
client = Client(
'remote-server.example.com',
compression='lz4', # LZ4 for faster INSERT performance
compress_block_size=1048576
)
# Generate large dataset
def generate_sample_data(count):
base_date = datetime.now() - timedelta(days=30)
for i in range(count):
yield (
i,
f"user_{random.randint(1000, 9999)}",
base_date + timedelta(seconds=random.randint(0, 2592000)),
random.uniform(10.0, 1000.0),
random.choice(['A', 'B', 'C', 'D'])
)
# Create table
client.execute('''
CREATE TABLE IF NOT EXISTS performance_test (
id UInt32,
username String,
created_at DateTime,
value Float64,
category Enum8('A'=1, 'B'=2, 'C'=3, 'D'=4)
) ENGINE = MergeTree()
ORDER BY (id, created_at)
''')
# Bulk insert with compression
print("Starting bulk insert with LZ4 compression...")
start_time = time.time()
# Insert in batches for optimal performance
batch_size = 100000
total_inserted = 0
for batch_start in range(0, 1000000, batch_size):
batch_data = list(generate_sample_data(batch_size))
client.execute(
'INSERT INTO performance_test VALUES',
batch_data,
settings={'async_insert': 1} # Async inserts for better performance
)
total_inserted += len(batch_data)
elapsed = time.time() - start_time
rate = total_inserted / elapsed
print(f"Inserted {total_inserted:,} rows at {rate:.0f} rows/sec")
print(f"Insert completed: {total_inserted:,} rows in {elapsed:.2f}s")# Enable compression via connection URL
client = Client.from_url(
'clickhouse://user:pass@remote-server.example.com:9000/mydb'
'?compression=zstd&compress_block_size=2097152'
)
# URL parameters for compression
# compression=lz4|lz4hc|zstd
# compress_block_size=1048576
# secure=1 (for SSL + compression)from clickhouse_driver import Client
from clickhouse_driver.errors import UnknownCompressionMethod
def test_compression_support():
"""Test which compression algorithms are available."""
algorithms = ['lz4', 'lz4hc', 'zstd']
supported = []
for algorithm in algorithms:
try:
client = Client('localhost', compression=algorithm)
client.execute('SELECT 1')
supported.append(algorithm)
client.disconnect()
print(f"✓ {algorithm} compression supported")
except UnknownCompressionMethod:
print(f"✗ {algorithm} compression not available")
print(f" Install with: pip install clickhouse-driver[{algorithm}]")
except Exception as e:
print(f"? {algorithm} test failed: {e}")
return supported
# Check compression support
supported_algorithms = test_compression_support()
print(f"Supported compression algorithms: {supported_algorithms}")
# Fall back to uncompressed if needed
if supported_algorithms:
best_algorithm = supported_algorithms[0] # Use first available
client = Client('remote-server.example.com', compression=best_algorithm)
else:
client = Client('remote-server.example.com', compression=False)
print("Using uncompressed connection")Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-driver