Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Multiple result formats and processing modes for efficient handling of query results including standard tuples, streaming iterators, progress tracking, and optional NumPy arrays. The driver provides flexible result processing to optimize memory usage and performance for different workload patterns.
Basic query result storage and access with metadata information.
class QueryResult:
"""
Standard query result storage.
Contains complete query results loaded into memory with
optional column metadata information.
"""
def __init__(self, data, columns_with_types=None):
"""
Initialize query result.
Parameters:
- data: List of result row tuples
- columns_with_types: Optional column metadata
"""
@property
def data(self):
"""
Query result data.
Returns:
- List[Tuple]: Result rows as tuples
"""
@property
def columns_with_types(self):
"""
Column metadata information.
Returns:
- List[Tuple]: Column info as (name, type) tuples
"""Memory-efficient streaming results for processing large datasets without loading everything into memory.
class IterQueryResult:
"""
Streaming query result iterator.
Yields result blocks incrementally for memory-efficient
processing of large query results.
"""
def __iter__(self):
"""
Iterator protocol support.
Returns:
- Iterator: Self as iterator
"""
def __next__(self):
"""
Get next result block.
Returns:
- List[Tuple]: Next block of result rows
Raises:
- StopIteration: When no more blocks available
"""
@property
def columns_with_types(self):
"""
Column metadata (available after first block).
Returns:
- List[Tuple]: Column info as (name, type) tuples
"""Query results with execution progress information for monitoring long-running operations.
class ProgressQueryResult:
"""
Query result with progress information.
Generator yielding (progress, data) tuples during query execution
for monitoring progress of long-running operations.
"""
def __iter__(self):
"""
Iterator protocol yielding progress and data.
Yields:
- Tuple[Progress, List[Tuple]]: (progress_info, result_block)
"""
@property
def columns_with_types(self):
"""
Column metadata information.
Returns:
- List[Tuple]: Column info as (name, type) tuples
"""Detailed progress tracking for query execution monitoring.
class Progress:
"""
Query execution progress information.
Provides detailed metrics about query processing progress
including rows processed, bytes processed, and timing information.
"""
def __init__(self, rows=0, bytes=0, total_rows=0, written_rows=0, written_bytes=0):
"""
Initialize progress information.
Parameters:
- rows: Number of rows processed
- bytes: Number of bytes processed
- total_rows: Total estimated rows to process
- written_rows: Number of rows written (for INSERT)
- written_bytes: Number of bytes written (for INSERT)
"""
@property
def rows(self):
"""Number of rows processed so far."""
@property
def bytes(self):
"""Number of bytes processed so far."""
@property
def total_rows(self):
"""Total estimated rows to process (0 if unknown)."""
@property
def written_rows(self):
"""Number of rows written (for INSERT operations)."""
@property
def written_bytes(self):
"""Number of bytes written (for INSERT operations)."""
def __str__(self):
"""Human-readable progress representation."""Metadata about query execution including statistics and profiling data.
class QueryInfo:
"""
Query execution information and statistics.
Contains metadata about query execution including
performance metrics and resource usage.
"""
def __init__(self, query_id=None, elapsed=None, rows_read=None, bytes_read=None):
"""
Initialize query information.
Parameters:
- query_id: Unique query identifier
- elapsed: Query execution time in seconds
- rows_read: Total rows read during execution
- bytes_read: Total bytes read during execution
"""
@property
def query_id(self):
"""Unique query identifier string."""
@property
def elapsed(self):
"""Query execution time in seconds."""
@property
def rows_read(self):
"""Total rows read during query execution."""
@property
def bytes_read(self):
"""Total bytes read during query execution."""Detailed execution profiling for performance analysis and optimization.
class BlockStreamProfileInfo:
"""
Execution profiling information.
Detailed performance metrics for query execution analysis
including timing breakdowns and resource utilization.
"""
def __init__(self, rows=0, blocks=0, bytes=0, applied_limit=0,
rows_before_limit=0, calculated_rows_before_limit=0):
"""
Initialize profile information.
Parameters:
- rows: Number of rows in profile
- blocks: Number of blocks processed
- bytes: Number of bytes processed
- applied_limit: Whether LIMIT was applied
- rows_before_limit: Rows count before LIMIT application
- calculated_rows_before_limit: Calculated rows before LIMIT
"""
@property
def rows(self):
"""Number of rows in this profile segment."""
@property
def blocks(self):
"""Number of blocks processed."""
@property
def bytes(self):
"""Number of bytes processed."""
@property
def applied_limit(self):
"""Whether LIMIT clause was applied."""
@property
def rows_before_limit(self):
"""Row count before LIMIT application."""High-performance NumPy-based results for numerical computing workloads (requires NumPy extras).
class NumpyQueryResult:
"""
NumPy-optimized query results.
Query results stored as NumPy arrays for high-performance
numerical computing workloads.
Requires: pip install clickhouse-driver[numpy]
"""
@property
def data(self):
"""
Query result data as NumPy structured array.
Returns:
- numpy.ndarray: Structured array with typed columns
"""
@property
def columns_with_types(self):
"""
Column metadata information.
Returns:
- List[Tuple]: Column info as (name, type) tuples
"""
class NumpyIterQueryResult:
"""
NumPy streaming query result iterator.
Streaming results with NumPy array blocks for memory-efficient
processing of large numerical datasets.
"""
def __iter__(self):
"""Iterator yielding NumPy array blocks."""
def __next__(self):
"""
Get next NumPy result block.
Returns:
- numpy.ndarray: Next block as structured array
"""
class NumpyProgressQueryResult:
"""
NumPy query result with progress tracking.
Progress-aware NumPy results for monitoring long-running
numerical computations.
"""
def __iter__(self):
"""
Iterator yielding progress and NumPy data.
Yields:
- Tuple[Progress, numpy.ndarray]: (progress, array_block)
"""from clickhouse_driver import Client
client = Client('localhost')
# Standard query execution
result = client.execute('SELECT name, age FROM users LIMIT 10')
for row in result:
name, age = row
print(f"{name}: {age} years old")
# Query with column types
columns, rows = client.execute(
'SELECT name, age, salary FROM employees',
with_column_types=True
)
print("Columns:", columns)
# Output: [('name', 'String'), ('age', 'UInt8'), ('salary', 'Decimal(10, 2)')]
for row in rows:
name, age, salary = row
print(f"{name}, {age}, ${salary}")# Memory-efficient processing of large datasets
query = 'SELECT * FROM large_table WHERE date >= %(start_date)s'
params = {'start_date': '2023-01-01'}
total_processed = 0
for block in client.execute_iter(query, params):
# Process each block (typically 1000-10000 rows)
for row in block:
process_row(row)
total_processed += 1
print(f"Processed {total_processed} rows so far...")
print(f"Total rows processed: {total_processed}")# Monitor progress of long-running queries
query = '''
SELECT category, COUNT(*) as count, AVG(price) as avg_price
FROM huge_sales_table
WHERE date >= '2020-01-01'
GROUP BY category
'''
print("Starting large aggregation query...")
start_time = time.time()
for progress, block in client.execute_with_progress(query):
if progress:
elapsed = time.time() - start_time
print(f"Progress: {progress.rows:,} rows processed in {elapsed:.1f}s")
if progress.total_rows > 0:
percent = (progress.rows / progress.total_rows) * 100
print(f"Estimated completion: {percent:.1f}%")
if block:
# Process results as they arrive
for category, count, avg_price in block:
print(f"{category}: {count:,} sales, avg ${avg_price:.2f}")
print("Query completed!")# Process data in columnar format
columns, rows = client.execute(
'SELECT user_id, purchase_amount, purchase_date FROM purchases',
with_column_types=True,
columnar=True
)
# columnar=True returns data organized by columns
user_ids, amounts, dates = zip(*rows) if rows else ([], [], [])
# Efficient column-wise processing
total_revenue = sum(amounts)
unique_users = len(set(user_ids))
latest_date = max(dates) if dates else None
print(f"Revenue: ${total_revenue:,.2f}")
print(f"Unique users: {unique_users:,}")
print(f"Latest purchase: {latest_date}")# Compare different result formats
query = 'SELECT id, name, score FROM test_data LIMIT 1000'
# Standard tuple format (default)
result = client.execute(query)
print(f"Standard format: {len(result)} rows")
print(f"First row: {result[0]}")
# With column information
columns, rows = client.execute(query, with_column_types=True)
print(f"With types: {columns}")
print(f"Data: {len(rows)} rows")
# Columnar format
columns, data = client.execute(query, with_column_types=True, columnar=True)
if data:
ids, names, scores = zip(*data)
print(f"Columnar: {len(ids)} ids, {len(names)} names, {len(scores)} scores")# Process results with external tables
external_lookup = {
'name': 'category_lookup',
'structure': [('id', 'UInt32'), ('name', 'String')],
'data': [(1, 'Electronics'), (2, 'Clothing'), (3, 'Books')]
}
result = client.execute('''
SELECT p.product_name, c.name as category_name, p.price
FROM products p
JOIN category_lookup c ON p.category_id = c.id
WHERE p.price > 100
''', external_tables=[external_lookup])
for product_name, category_name, price in result:
print(f"{product_name} ({category_name}): ${price}")# High-performance numerical processing (requires numpy extra)
import numpy as np
# Enable NumPy for numerical queries
client = Client('localhost', settings={'use_numpy': True})
# Query returning numerical data
result = client.execute('''
SELECT
measurement_time,
sensor_id,
temperature,
humidity,
pressure
FROM sensor_data
WHERE measurement_time >= now() - INTERVAL 1 HOUR
''')
# Result is automatically converted to NumPy structured array
if hasattr(result, 'data') and isinstance(result.data, np.ndarray):
data = result.data
# Efficient NumPy operations
avg_temp = np.mean(data['temperature'])
max_humidity = np.max(data['humidity'])
temp_std = np.std(data['temperature'])
print(f"Average temperature: {avg_temp:.2f}°C")
print(f"Max humidity: {max_humidity:.1f}%")
print(f"Temperature std dev: {temp_std:.2f}°C")
# Advanced NumPy analysis
high_temp_sensors = np.unique(data[data['temperature'] > 30]['sensor_id'])
print(f"High temperature sensors: {high_temp_sensors}")from clickhouse_driver.errors import PartiallyConsumedQueryError
try:
# Start streaming query
result_iter = client.execute_iter('SELECT * FROM large_table')
# Process first few blocks
for i, block in enumerate(result_iter):
if i >= 2: # Process only first 2 blocks
break
process_block(block)
# Try to execute another query before consuming all results
client.execute('SELECT 1') # This will raise PartiallyConsumedQueryError
except PartiallyConsumedQueryError:
print("Previous query not fully consumed - finishing iteration")
# Consume remaining results
for block in result_iter:
pass # Discard remaining blocks
# Now can execute new query
client.execute('SELECT 1')def process_query_with_callback(client, query, row_callback, batch_size=1000):
"""Process query results with custom callback function."""
processed_count = 0
for block in client.execute_iter(query):
for row in block:
try:
row_callback(row)
processed_count += 1
if processed_count % batch_size == 0:
print(f"Processed {processed_count:,} rows")
except Exception as e:
print(f"Error processing row {processed_count}: {e}")
continue
return processed_count
# Usage with custom processing
def analyze_sale(row):
product_id, sale_amount, customer_id = row
# Custom analysis logic
if sale_amount > 1000:
print(f"Large sale: ${sale_amount} for product {product_id}")
total = process_query_with_callback(
client,
'SELECT product_id, amount, customer_id FROM sales',
analyze_sale
)
print(f"Analyzed {total:,} sales records")Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-driver