CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-driver

Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

compression.mddocs/

Compression and Performance

Built-in support for data compression algorithms including LZ4 and ZSTD with configurable block sizes, plus optional Cython extensions for performance-critical operations. Compression significantly reduces network traffic and can improve query performance for network-bound workloads.

Capabilities

Compression Algorithm Support

Multiple compression algorithms with different performance and compression ratio characteristics.

# Compression algorithms (require optional dependencies)
# LZ4: Fast compression with good performance
# LZ4HC: Higher compression ratio, slower than LZ4
# ZSTD: Excellent compression ratio with good performance

# Installation requirements
# pip install clickhouse-driver[lz4]    # For LZ4 support
# pip install clickhouse-driver[zstd]   # For ZSTD support
# pip install clickhouse-driver[lz4,zstd]  # For both algorithms

Client Compression Configuration

Configure compression at client level for all connections and queries.

# Enable compression in client constructor
client = Client(
    host='localhost',
    compression=True,                    # Enable compression (auto-detect algorithm)
    compress_block_size=1048576,         # Compression block size (1MB default)
    compression_algorithm='lz4'          # Specific algorithm: 'lz4', 'lz4hc', 'zstd'
)

# Alternative configuration styles
client = Client('localhost', compression='lz4')        # Enable LZ4
client = Client('localhost', compression='zstd')       # Enable ZSTD  
client = Client('localhost', compression='lz4hc')      # Enable LZ4HC

Query-Level Compression Settings

Override compression settings for individual queries through ClickHouse settings.

# Query-specific compression settings
result = client.execute(
    'SELECT * FROM large_table',
    settings={
        'network_compression_method': 'zstd',        # Algorithm for this query
        'network_zstd_compression_level': 3,         # ZSTD compression level (1-22)
        'compress': 1,                               # Enable compression  
        'decompress': 1                              # Enable decompression
    }
)

# Available compression settings
compression_settings = {
    'network_compression_method': 'lz4|lz4hc|zstd',  # Algorithm choice
    'network_zstd_compression_level': 1,              # ZSTD level (1-22, default: 1)
    'compress': 1,                                    # Enable compression (0/1)
    'decompress': 1                                   # Enable decompression (0/1)
}

Compression Algorithm Classes

Low-level compression interfaces for advanced usage (not typically needed for normal operations).

# Base compression interfaces (internal use)
class Compressor:
    """Base compressor interface."""
    
    def compress(self, data):
        """
        Compress data block.
        
        Parameters:
        - data: bytes to compress
        
        Returns:
        - bytes: compressed data
        """

class Decompressor:
    """Base decompressor interface."""
    
    def decompress(self, data):
        """
        Decompress data block.
        
        Parameters:
        - data: compressed bytes
        
        Returns:
        - bytes: decompressed data  
        """

# Algorithm-specific implementations
# LZ4Compressor, LZ4Decompressor
# LZ4HCCompressor, LZ4HCDecompressor  
# ZSTDCompressor, ZSTDDecompressor

Performance Optimization Features

Optional Cython extensions and performance tuning for high-throughput workloads.

# Cython extensions (automatically used if available)
# Built during installation for performance-critical operations:
# - bufferedreader: Fast binary data reading
# - bufferedwriter: Fast binary data writing  
# - varint: Variable integer encoding/decoding
# - columns.largeint: Large integer processing

# Performance settings
client = Client(
    'localhost',
    compress_block_size=4194304,         # Larger blocks: better compression, more memory
    send_receive_timeout=300,            # Longer timeout for large compressed data
    sync_request_timeout=60              # Timeout for synchronous operations
)

Compression Performance Characteristics

Algorithm Comparison

AlgorithmCompression SpeedDecompression SpeedCompression RatioUse Case
LZ4Very FastVery FastGoodReal-time, low latency
LZ4HCModerateVery FastBetterBalanced performance
ZSTDFastFastExcellentBest overall choice

Block Size Impact

# Small blocks (64KB - 256KB)
# - Lower memory usage
# - Faster response times
# - Less compression efficiency

client_small_blocks = Client(
    'localhost',
    compression='lz4',
    compress_block_size=65536  # 64KB blocks
)

# Large blocks (1MB - 4MB)  
# - Better compression ratios
# - Higher memory usage
# - Potential latency increase

client_large_blocks = Client(
    'localhost', 
    compression='zstd',
    compress_block_size=4194304  # 4MB blocks
)

Usage Examples

Basic Compression Setup

from clickhouse_driver import Client

# Enable LZ4 compression (requires: pip install clickhouse-driver[lz4])
client = Client(
    host='remote-server.example.com',
    compression='lz4',
    compress_block_size=1048576  # 1MB blocks
)

# Query with compression (automatically applied)
result = client.execute('SELECT * FROM large_table LIMIT 10000')
print(f"Retrieved {len(result)} rows with LZ4 compression")

client.disconnect()

ZSTD High Compression

# Enable ZSTD for best compression ratio (requires: pip install clickhouse-driver[zstd])
client = Client(
    host='slow-network-server.example.com',
    compression='zstd',
    compress_block_size=2097152  # 2MB blocks for better compression
)

# Large data transfer with high compression
result = client.execute('''
    SELECT user_id, event_data, timestamp, metadata
    FROM user_events 
    WHERE date >= today() - 30
''', settings={
    'network_zstd_compression_level': 6  # Higher compression level
})

print(f"Retrieved {len(result)} events with ZSTD compression")

Adaptive Compression Strategy

import time
from clickhouse_driver import Client

def create_optimized_client(server_type='local'):
    """Create client with compression optimized for server type."""
    
    if server_type == 'local':
        # Local server: minimal compression for lowest latency
        return Client(
            'localhost',
            compression=False  # No compression overhead
        )
    elif server_type == 'remote_fast':
        # Fast remote connection: balanced compression
        return Client(
            'remote-server.example.com',
            compression='lz4',
            compress_block_size=1048576
        )
    elif server_type == 'remote_slow':
        # Slow/expensive connection: maximum compression
        return Client(
            'slow-server.example.com', 
            compression='zstd',
            compress_block_size=4194304,
            settings={
                'network_zstd_compression_level': 9
            }
        )

# Usage based on deployment
client = create_optimized_client('remote_slow')

Compression Performance Measurement

import time
from clickhouse_driver import Client

def benchmark_compression(query, algorithms=['none', 'lz4', 'zstd']):
    """Benchmark query performance with different compression algorithms."""
    
    results = {}
    
    for algorithm in algorithms:
        if algorithm == 'none':
            client = Client('remote-server.example.com', compression=False)
        else:
            client = Client('remote-server.example.com', compression=algorithm)
        
        start_time = time.time()
        result = client.execute(query)
        end_time = time.time()
        
        results[algorithm] = {
            'duration': end_time - start_time,
            'rows': len(result),
            'rows_per_second': len(result) / (end_time - start_time)
        }
        
        client.disconnect()
    
    return results

# Benchmark large query
query = 'SELECT * FROM large_table WHERE date >= today() - 7'
benchmark_results = benchmark_compression(query)

for algorithm, metrics in benchmark_results.items():
    print(f"{algorithm}: {metrics['duration']:.2f}s, "
          f"{metrics['rows_per_second']:.0f} rows/sec")

Streaming with Compression

# Large streaming query with compression
client = Client(
    'remote-server.example.com',
    compression='zstd',
    compress_block_size=2097152  # 2MB blocks
)

total_rows = 0
start_time = time.time()

# Stream large dataset with compression
for block in client.execute_iter('''
    SELECT user_id, action, timestamp, details
    FROM user_activity_log 
    WHERE date >= today() - 90
'''):
    # Process each compressed block
    for row in block:
        process_user_activity(row)
        total_rows += 1
    
    if total_rows % 100000 == 0:
        elapsed = time.time() - start_time
        rate = total_rows / elapsed
        print(f"Processed {total_rows:,} rows at {rate:.0f} rows/sec")

print(f"Total: {total_rows:,} rows processed with ZSTD compression")

INSERT Performance with Compression

import random
from datetime import datetime, timedelta

# Large INSERT with compression
client = Client(
    'remote-server.example.com',
    compression='lz4',  # LZ4 for faster INSERT performance
    compress_block_size=1048576
)

# Generate large dataset
def generate_sample_data(count):
    base_date = datetime.now() - timedelta(days=30)
    
    for i in range(count):
        yield (
            i,
            f"user_{random.randint(1000, 9999)}",
            base_date + timedelta(seconds=random.randint(0, 2592000)),
            random.uniform(10.0, 1000.0),
            random.choice(['A', 'B', 'C', 'D'])
        )

# Create table
client.execute('''
    CREATE TABLE IF NOT EXISTS performance_test (
        id UInt32,
        username String,
        created_at DateTime,
        value Float64,
        category Enum8('A'=1, 'B'=2, 'C'=3, 'D'=4)
    ) ENGINE = MergeTree()
    ORDER BY (id, created_at)
''')

# Bulk insert with compression
print("Starting bulk insert with LZ4 compression...")
start_time = time.time()

# Insert in batches for optimal performance
batch_size = 100000
total_inserted = 0

for batch_start in range(0, 1000000, batch_size):
    batch_data = list(generate_sample_data(batch_size))
    
    client.execute(
        'INSERT INTO performance_test VALUES',
        batch_data,
        settings={'async_insert': 1}  # Async inserts for better performance
    )
    
    total_inserted += len(batch_data)
    elapsed = time.time() - start_time
    rate = total_inserted / elapsed
    
    print(f"Inserted {total_inserted:,} rows at {rate:.0f} rows/sec")

print(f"Insert completed: {total_inserted:,} rows in {elapsed:.2f}s")

Connection URL with Compression

# Enable compression via connection URL
client = Client.from_url(
    'clickhouse://user:pass@remote-server.example.com:9000/mydb'
    '?compression=zstd&compress_block_size=2097152'
)

# URL parameters for compression
# compression=lz4|lz4hc|zstd
# compress_block_size=1048576
# secure=1 (for SSL + compression)

Troubleshooting Compression Issues

from clickhouse_driver import Client
from clickhouse_driver.errors import UnknownCompressionMethod

def test_compression_support():
    """Test which compression algorithms are available."""
    
    algorithms = ['lz4', 'lz4hc', 'zstd']
    supported = []
    
    for algorithm in algorithms:
        try:
            client = Client('localhost', compression=algorithm)
            client.execute('SELECT 1')
            supported.append(algorithm)
            client.disconnect()
            print(f"✓ {algorithm} compression supported")
            
        except UnknownCompressionMethod:
            print(f"✗ {algorithm} compression not available")
            print(f"  Install with: pip install clickhouse-driver[{algorithm}]")
        except Exception as e:
            print(f"? {algorithm} test failed: {e}")
    
    return supported

# Check compression support
supported_algorithms = test_compression_support()
print(f"Supported compression algorithms: {supported_algorithms}")

# Fall back to uncompressed if needed
if supported_algorithms:
    best_algorithm = supported_algorithms[0]  # Use first available
    client = Client('remote-server.example.com', compression=best_algorithm)
else:
    client = Client('remote-server.example.com', compression=False)
    print("Using uncompressed connection")

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-driver

docs

client-interface.md

compression.md

data-types.md

dbapi-interface.md

error-handling.md

index.md

results-processing.md

tile.json