CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-driver

Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

results-processing.mddocs/

Results Processing

Multiple result formats and processing modes for efficient handling of query results including standard tuples, streaming iterators, progress tracking, and optional NumPy arrays. The driver provides flexible result processing to optimize memory usage and performance for different workload patterns.

Capabilities

Standard Query Results

Basic query result storage and access with metadata information.

class QueryResult:
    """
    Standard query result storage.
    
    Contains complete query results loaded into memory with
    optional column metadata information.
    """
    
    def __init__(self, data, columns_with_types=None):
        """
        Initialize query result.
        
        Parameters:
        - data: List of result row tuples
        - columns_with_types: Optional column metadata
        """
        
    @property
    def data(self):
        """
        Query result data.
        
        Returns:
        - List[Tuple]: Result rows as tuples
        """
        
    @property
    def columns_with_types(self):
        """
        Column metadata information.
        
        Returns:
        - List[Tuple]: Column info as (name, type) tuples
        """

Streaming Query Results

Memory-efficient streaming results for processing large datasets without loading everything into memory.

class IterQueryResult:
    """
    Streaming query result iterator.
    
    Yields result blocks incrementally for memory-efficient
    processing of large query results.
    """
    
    def __iter__(self):
        """
        Iterator protocol support.
        
        Returns:
        - Iterator: Self as iterator
        """
        
    def __next__(self):
        """
        Get next result block.
        
        Returns:
        - List[Tuple]: Next block of result rows
        
        Raises:
        - StopIteration: When no more blocks available
        """
        
    @property
    def columns_with_types(self):
        """
        Column metadata (available after first block).
        
        Returns:
        - List[Tuple]: Column info as (name, type) tuples
        """

Progress Tracking Results

Query results with execution progress information for monitoring long-running operations.

class ProgressQueryResult:
    """
    Query result with progress information.
    
    Generator yielding (progress, data) tuples during query execution
    for monitoring progress of long-running operations.
    """
    
    def __iter__(self):
        """
        Iterator protocol yielding progress and data.
        
        Yields:
        - Tuple[Progress, List[Tuple]]: (progress_info, result_block)
        """
        
    @property
    def columns_with_types(self):
        """
        Column metadata information.
        
        Returns:
        - List[Tuple]: Column info as (name, type) tuples
        """

Progress Information

Detailed progress tracking for query execution monitoring.

class Progress:
    """
    Query execution progress information.
    
    Provides detailed metrics about query processing progress
    including rows processed, bytes processed, and timing information.
    """
    
    def __init__(self, rows=0, bytes=0, total_rows=0, written_rows=0, written_bytes=0):
        """
        Initialize progress information.
        
        Parameters:
        - rows: Number of rows processed
        - bytes: Number of bytes processed
        - total_rows: Total estimated rows to process
        - written_rows: Number of rows written (for INSERT)
        - written_bytes: Number of bytes written (for INSERT)
        """
        
    @property
    def rows(self):
        """Number of rows processed so far."""
        
    @property
    def bytes(self):
        """Number of bytes processed so far."""
        
    @property
    def total_rows(self):
        """Total estimated rows to process (0 if unknown)."""
        
    @property
    def written_rows(self):
        """Number of rows written (for INSERT operations)."""
        
    @property
    def written_bytes(self):
        """Number of bytes written (for INSERT operations)."""
        
    def __str__(self):
        """Human-readable progress representation."""

Query Execution Information

Metadata about query execution including statistics and profiling data.

class QueryInfo:
    """
    Query execution information and statistics.
    
    Contains metadata about query execution including
    performance metrics and resource usage.
    """
    
    def __init__(self, query_id=None, elapsed=None, rows_read=None, bytes_read=None):
        """
        Initialize query information.
        
        Parameters:
        - query_id: Unique query identifier
        - elapsed: Query execution time in seconds
        - rows_read: Total rows read during execution
        - bytes_read: Total bytes read during execution
        """
        
    @property
    def query_id(self):
        """Unique query identifier string."""
        
    @property
    def elapsed(self):
        """Query execution time in seconds."""
        
    @property
    def rows_read(self):
        """Total rows read during query execution."""
        
    @property
    def bytes_read(self):
        """Total bytes read during query execution."""

Block Stream Profile Information

Detailed execution profiling for performance analysis and optimization.

class BlockStreamProfileInfo:
    """
    Execution profiling information.
    
    Detailed performance metrics for query execution analysis
    including timing breakdowns and resource utilization.
    """
    
    def __init__(self, rows=0, blocks=0, bytes=0, applied_limit=0, 
                 rows_before_limit=0, calculated_rows_before_limit=0):
        """
        Initialize profile information.
        
        Parameters:
        - rows: Number of rows in profile
        - blocks: Number of blocks processed
        - bytes: Number of bytes processed
        - applied_limit: Whether LIMIT was applied
        - rows_before_limit: Rows count before LIMIT application
        - calculated_rows_before_limit: Calculated rows before LIMIT
        """
        
    @property
    def rows(self):
        """Number of rows in this profile segment."""
        
    @property
    def blocks(self):
        """Number of blocks processed."""
        
    @property
    def bytes(self):
        """Number of bytes processed."""
        
    @property
    def applied_limit(self):
        """Whether LIMIT clause was applied."""
        
    @property
    def rows_before_limit(self):
        """Row count before LIMIT application."""

NumPy Integration Results

High-performance NumPy-based results for numerical computing workloads (requires NumPy extras).

class NumpyQueryResult:
    """
    NumPy-optimized query results.
    
    Query results stored as NumPy arrays for high-performance
    numerical computing workloads.
    
    Requires: pip install clickhouse-driver[numpy]
    """
    
    @property
    def data(self):
        """
        Query result data as NumPy structured array.
        
        Returns:
        - numpy.ndarray: Structured array with typed columns
        """
        
    @property
    def columns_with_types(self):
        """
        Column metadata information.
        
        Returns:
        - List[Tuple]: Column info as (name, type) tuples
        """

class NumpyIterQueryResult:
    """
    NumPy streaming query result iterator.
    
    Streaming results with NumPy array blocks for memory-efficient
    processing of large numerical datasets.
    """
    
    def __iter__(self):
        """Iterator yielding NumPy array blocks."""
        
    def __next__(self):
        """
        Get next NumPy result block.
        
        Returns:
        - numpy.ndarray: Next block as structured array
        """

class NumpyProgressQueryResult:
    """
    NumPy query result with progress tracking.
    
    Progress-aware NumPy results for monitoring long-running
    numerical computations.
    """
    
    def __iter__(self):
        """
        Iterator yielding progress and NumPy data.
        
        Yields:
        - Tuple[Progress, numpy.ndarray]: (progress, array_block)
        """

Result Processing Examples

Basic Result Handling

from clickhouse_driver import Client

client = Client('localhost')

# Standard query execution
result = client.execute('SELECT name, age FROM users LIMIT 10')
for row in result:
    name, age = row
    print(f"{name}: {age} years old")

# Query with column types
columns, rows = client.execute(
    'SELECT name, age, salary FROM employees',
    with_column_types=True
)

print("Columns:", columns)
# Output: [('name', 'String'), ('age', 'UInt8'), ('salary', 'Decimal(10, 2)')]

for row in rows:
    name, age, salary = row
    print(f"{name}, {age}, ${salary}")

Streaming Large Results

# Memory-efficient processing of large datasets
query = 'SELECT * FROM large_table WHERE date >= %(start_date)s'
params = {'start_date': '2023-01-01'}

total_processed = 0
for block in client.execute_iter(query, params):
    # Process each block (typically 1000-10000 rows)
    for row in block:
        process_row(row)
        total_processed += 1
    
    print(f"Processed {total_processed} rows so far...")

print(f"Total rows processed: {total_processed}")

Progress Monitoring

# Monitor progress of long-running queries
query = '''
    SELECT category, COUNT(*) as count, AVG(price) as avg_price
    FROM huge_sales_table 
    WHERE date >= '2020-01-01'
    GROUP BY category
'''

print("Starting large aggregation query...")
start_time = time.time()

for progress, block in client.execute_with_progress(query):
    if progress:
        elapsed = time.time() - start_time
        print(f"Progress: {progress.rows:,} rows processed in {elapsed:.1f}s")
        
        if progress.total_rows > 0:
            percent = (progress.rows / progress.total_rows) * 100
            print(f"Estimated completion: {percent:.1f}%")
    
    if block:
        # Process results as they arrive
        for category, count, avg_price in block:
            print(f"{category}: {count:,} sales, avg ${avg_price:.2f}")

print("Query completed!")

Columnar Data Processing

# Process data in columnar format
columns, rows = client.execute(
    'SELECT user_id, purchase_amount, purchase_date FROM purchases',
    with_column_types=True,
    columnar=True
)

# columnar=True returns data organized by columns
user_ids, amounts, dates = zip(*rows) if rows else ([], [], [])

# Efficient column-wise processing
total_revenue = sum(amounts)
unique_users = len(set(user_ids))
latest_date = max(dates) if dates else None

print(f"Revenue: ${total_revenue:,.2f}")
print(f"Unique users: {unique_users:,}")
print(f"Latest purchase: {latest_date}")

Result Format Comparisons

# Compare different result formats
query = 'SELECT id, name, score FROM test_data LIMIT 1000'

# Standard tuple format (default)
result = client.execute(query)
print(f"Standard format: {len(result)} rows")
print(f"First row: {result[0]}")

# With column information
columns, rows = client.execute(query, with_column_types=True)
print(f"With types: {columns}")
print(f"Data: {len(rows)} rows")

# Columnar format
columns, data = client.execute(query, with_column_types=True, columnar=True)
if data:
    ids, names, scores = zip(*data)
    print(f"Columnar: {len(ids)} ids, {len(names)} names, {len(scores)} scores")

External Table Results

# Process results with external tables
external_lookup = {
    'name': 'category_lookup',
    'structure': [('id', 'UInt32'), ('name', 'String')],
    'data': [(1, 'Electronics'), (2, 'Clothing'), (3, 'Books')]
}

result = client.execute('''
    SELECT p.product_name, c.name as category_name, p.price
    FROM products p
    JOIN category_lookup c ON p.category_id = c.id
    WHERE p.price > 100
''', external_tables=[external_lookup])

for product_name, category_name, price in result:
    print(f"{product_name} ({category_name}): ${price}")

NumPy Integration

# High-performance numerical processing (requires numpy extra)
import numpy as np

# Enable NumPy for numerical queries
client = Client('localhost', settings={'use_numpy': True})

# Query returning numerical data
result = client.execute('''
    SELECT 
        measurement_time,
        sensor_id,
        temperature,
        humidity,
        pressure
    FROM sensor_data 
    WHERE measurement_time >= now() - INTERVAL 1 HOUR
''')

# Result is automatically converted to NumPy structured array
if hasattr(result, 'data') and isinstance(result.data, np.ndarray):
    data = result.data
    
    # Efficient NumPy operations
    avg_temp = np.mean(data['temperature'])
    max_humidity = np.max(data['humidity'])
    temp_std = np.std(data['temperature'])
    
    print(f"Average temperature: {avg_temp:.2f}°C")
    print(f"Max humidity: {max_humidity:.1f}%")
    print(f"Temperature std dev: {temp_std:.2f}°C")
    
    # Advanced NumPy analysis
    high_temp_sensors = np.unique(data[data['temperature'] > 30]['sensor_id'])
    print(f"High temperature sensors: {high_temp_sensors}")

Error Handling with Results

from clickhouse_driver.errors import PartiallyConsumedQueryError

try:
    # Start streaming query
    result_iter = client.execute_iter('SELECT * FROM large_table')
    
    # Process first few blocks
    for i, block in enumerate(result_iter):
        if i >= 2:  # Process only first 2 blocks
            break
        process_block(block)
    
    # Try to execute another query before consuming all results
    client.execute('SELECT 1')  # This will raise PartiallyConsumedQueryError
    
except PartiallyConsumedQueryError:
    print("Previous query not fully consumed - finishing iteration")
    
    # Consume remaining results
    for block in result_iter:
        pass  # Discard remaining blocks
    
    # Now can execute new query
    client.execute('SELECT 1')

Custom Result Processing

def process_query_with_callback(client, query, row_callback, batch_size=1000):
    """Process query results with custom callback function."""
    
    processed_count = 0
    
    for block in client.execute_iter(query):
        for row in block:
            try:
                row_callback(row)
                processed_count += 1
                
                if processed_count % batch_size == 0:
                    print(f"Processed {processed_count:,} rows")
                    
            except Exception as e:
                print(f"Error processing row {processed_count}: {e}")
                continue
    
    return processed_count

# Usage with custom processing
def analyze_sale(row):
    product_id, sale_amount, customer_id = row
    # Custom analysis logic
    if sale_amount > 1000:
        print(f"Large sale: ${sale_amount} for product {product_id}")

total = process_query_with_callback(
    client,
    'SELECT product_id, amount, customer_id FROM sales',
    analyze_sale
)

print(f"Analyzed {total:,} sales records")

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-driver

docs

client-interface.md

compression.md

data-types.md

dbapi-interface.md

error-handling.md

index.md

results-processing.md

tile.json