CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-connect

ClickHouse Database Core Driver for Python, Pandas, and Superset

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

data-formats.mddocs/

Data Formats

Integration with the scientific Python ecosystem including NumPy arrays, Pandas DataFrames, and PyArrow tables for high-performance data processing and analysis workflows. Provides seamless conversion between ClickHouse data and popular data science libraries.

Capabilities

Pandas DataFrame Integration

Native support for querying data directly into Pandas DataFrames and inserting DataFrame data into ClickHouse tables with automatic type conversion and index handling.

def query_df(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    use_na_values: bool = True,
    max_str_len: int = 0,
    context: QueryContext | None = None
) -> pd.DataFrame:
    """
    Execute query and return results as Pandas DataFrame.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters dictionary
    - settings: ClickHouse settings for query execution
    - use_na_values: Use pandas NA values for ClickHouse NULLs
    - max_str_len: Maximum string column length (0 = unlimited)
    - context: Reusable query context
    
    Returns:
    pandas.DataFrame with properly typed columns and index
    
    Requires:
    pandas package (install with: pip install clickhouse-connect[pandas])
    """

def query_df_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[pd.DataFrame, None, None]:
    """
    Stream query results as DataFrame chunks.
    
    Yields:
    pandas.DataFrame objects for each result block
    
    Requires:
    pandas package
    """

def insert_df(
    self,
    table: str,
    df: pd.DataFrame,
    database: str = '',
    settings: dict | None = None,
    column_names: Sequence[str] | None = None,
    column_type_names: Sequence[str] | None = None
):
    """
    Insert Pandas DataFrame into ClickHouse table.
    
    Parameters:
    - table: Target table name
    - df: Pandas DataFrame to insert
    - database: Target database (uses client default if empty)
    - settings: ClickHouse settings for insert operation
    - column_names: Override DataFrame column names
    - column_type_names: Specify ClickHouse types for columns
    
    Features:
    - Automatic type conversion from pandas to ClickHouse types
    - Handles datetime, categorical, and nullable columns
    - Preserves precision for numeric types
    - Supports multi-index DataFrames
    
    Requires:
    pandas package
    """

NumPy Array Integration

Direct integration with NumPy for high-performance numerical computations with automatic handling of multidimensional arrays and dtype conversion.

def query_np(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    external_data: ExternalData | None = None,
    context: QueryContext | None = None
) -> np.ndarray:
    """
    Execute query and return results as NumPy array.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters dictionary
    - settings: ClickHouse settings for query execution
    - external_data: External data for query processing
    - context: Reusable query context
    
    Returns:
    numpy.ndarray with appropriate dtype for ClickHouse column types
    
    Features:
    - Automatic dtype selection based on ClickHouse types
    - Efficient memory layout for numerical operations
    - Support for structured arrays (named columns)
    - Handles nullable columns with masked arrays
    
    Requires:
    numpy package (install with: pip install clickhouse-connect[numpy])
    """

def query_np_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[np.ndarray, None, None]:
    """
    Stream query results as NumPy array chunks.
    
    Yields:
    numpy.ndarray objects for each result block
    
    Requires:
    numpy package
    """

PyArrow Integration

Integration with Apache Arrow for columnar data processing with zero-copy operations and interoperability with Arrow-based tools and file formats.

def query_arrow(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> pa.Table:
    """
    Execute query and return results as PyArrow Table.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters dictionary
    - settings: ClickHouse settings for query execution
    - context: Reusable query context
    
    Returns:
    pyarrow.Table with schema matching ClickHouse column types
    
    Features:
    - Zero-copy data transfer where possible
    - Preserves all ClickHouse type information
    - Efficient columnar storage format
    - Compatible with Arrow ecosystem tools
    
    Requires:
    pyarrow package (install with: pip install clickhouse-connect[arrow])
    """

def query_arrow_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[pa.Table, None, None]:
    """
    Stream query results as PyArrow Table chunks.
    
    Yields:
    pyarrow.Table objects for each result block
    
    Requires:
    pyarrow package
    """

def insert_arrow(
    self,
    table: str,
    arrow_table: pa.Table,
    database: str = '',
    settings: dict | None = None
):
    """
    Insert PyArrow Table into ClickHouse table.
    
    Parameters:
    - table: Target table name
    - arrow_table: PyArrow Table to insert
    - database: Target database (uses client default if empty)
    - settings: ClickHouse settings for insert operation
    
    Features:
    - Direct columnar data transfer
    - Automatic schema mapping from Arrow to ClickHouse
    - Efficient batch processing
    - Preserves metadata and type information
    
    Requires:
    pyarrow package
    """

Arrow Batch Processing

Advanced Arrow integration supporting batch operations and RecordBatch processing for memory-efficient handling of large datasets.

def to_arrow_batches(
    result: QueryResult,
    max_block_size: int = 65536
) -> Generator[pa.RecordBatch, None, None]:
    """
    Convert QueryResult to Arrow RecordBatch generator.
    
    Parameters:
    - result: QueryResult from query execution
    - max_block_size: Maximum rows per batch
    
    Yields:
    pyarrow.RecordBatch objects with consistent schema
    
    Requires:
    pyarrow package
    """

def arrow_buffer(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> BinaryIO:
    """
    Execute query and return Arrow IPC buffer.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters dictionary
    - settings: ClickHouse settings
    - context: Query context
    
    Returns:
    Binary stream containing Arrow IPC data
    
    Requires:
    pyarrow package
    """

Data Type Conversion

Comprehensive type mapping between ClickHouse types and Python data science library types with configurable conversion options.

# ClickHouse to Pandas type mapping
CH_PANDAS_TYPE_MAP = {
    'String': 'object',
    'Int32': 'int32',
    'Int64': 'int64',
    'Float32': 'float32',
    'Float64': 'float64',
    'DateTime': 'datetime64[ns]',
    'Date': 'datetime64[ns]',
    'Bool': 'bool',
    'Nullable(Int32)': 'Int32',  # Pandas nullable integer
    'Array(String)': 'object'
}

# ClickHouse to NumPy dtype mapping
CH_NUMPY_TYPE_MAP = {
    'Int8': np.int8,
    'Int16': np.int16,
    'Int32': np.int32,
    'Int64': np.int64,
    'UInt8': np.uint8,
    'UInt16': np.uint16,
    'UInt32': np.uint32,
    'UInt64': np.uint64,
    'Float32': np.float32,
    'Float64': np.float64,
    'String': np.object_,
    'DateTime': 'datetime64[s]',
    'Date': 'datetime64[D]'
}

# ClickHouse to Arrow type mapping
CH_ARROW_TYPE_MAP = {
    'String': pa.string(),
    'Int32': pa.int32(),
    'Int64': pa.int64(),
    'Float64': pa.float64(),
    'DateTime': pa.timestamp('s'),
    'Date': pa.date32(),
    'Array(String)': pa.list_(pa.string()),
    'Nullable(Int32)': pa.int32()  # Arrow handles nullability natively
}

Usage Examples

Pandas DataFrame Operations

import clickhouse_connect
import pandas as pd

client = clickhouse_connect.create_client(host='localhost')

# Query to DataFrame
df = client.query_df("""
    SELECT 
        user_id,
        event_time,
        event_type,
        value
    FROM events 
    WHERE event_time >= '2023-01-01'
    ORDER BY event_time
""")

print(df.dtypes)
print(df.head())

# DataFrame analysis
daily_stats = df.groupby(df['event_time'].dt.date).agg({
    'user_id': 'nunique',
    'value': ['sum', 'mean', 'count']
})

# Insert processed DataFrame
client.insert_df('daily_stats', daily_stats)

# Streaming large datasets
for chunk_df in client.query_df_stream(
    'SELECT * FROM large_events_table',
    settings={'max_block_size': 50000}
):
    # Process each chunk
    processed_chunk = chunk_df.groupby('category').sum()
    client.insert_df('processed_events', processed_chunk)

NumPy Array Operations

import clickhouse_connect
import numpy as np

client = clickhouse_connect.create_client(host='localhost')

# Query to NumPy array
data = client.query_np("""
    SELECT 
        price,
        volume,
        timestamp
    FROM market_data
    WHERE symbol = 'AAPL'
    ORDER BY timestamp
""")

# Data is returned as structured array
prices = data['price']
volumes = data['volume']
timestamps = data['timestamp']

# Numerical analysis
price_changes = np.diff(prices)
volume_weighted_price = np.average(prices, weights=volumes)
correlation = np.corrcoef(prices[1:], volumes[1:])[0, 1]

print(f"VWAP: {volume_weighted_price}")
print(f"Price-Volume Correlation: {correlation}")

# Streaming numerical data
running_sum = 0
count = 0

for chunk in client.query_np_stream(
    'SELECT value FROM sensor_data ORDER BY timestamp',
    settings={'max_block_size': 100000}
):
    running_sum += np.sum(chunk['value'])
    count += len(chunk)
    
average = running_sum / count
print(f"Overall average: {average}")

PyArrow Table Operations

import clickhouse_connect
import pyarrow as pa

client = clickhouse_connect.create_client(host='localhost')

# Query to Arrow Table
table = client.query_arrow("""
    SELECT 
        customer_id,
        product_id,
        quantity,
        price,
        order_date
    FROM orders
    WHERE order_date >= '2023-01-01'
""")

# Arrow operations
filtered_table = table.filter(
    pa.compute.greater(table['quantity'], 10)
)

# Aggregation
summary = filtered_table.group_by(['product_id']).aggregate([
    ('quantity', 'sum'),
    ('price', 'mean'),
    ('customer_id', 'count_distinct')
])

print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")

# Convert to other formats
pandas_df = table.to_pandas()
numpy_dict = table.to_pydict()

# Save to file formats
table.to_parquet('orders_export.parquet')

# Insert Arrow data
new_data = pa.table({
    'id': [1, 2, 3],
    'name': ['Alice', 'Bob', 'Carol'],
    'score': [95.5, 87.2, 92.1]
})

client.insert_arrow('test_scores', new_data)

Mixed Format Workflows

import clickhouse_connect
import pandas as pd
import numpy as np
import pyarrow as pa

client = clickhouse_connect.create_client(host='localhost')

# Start with raw data query
raw_data = client.query('SELECT * FROM raw_events')

# Convert to different formats as needed
df = pd.DataFrame(raw_data.result_set, columns=raw_data.column_names)

# Data cleaning with pandas
df_clean = df.dropna().reset_index(drop=True)
df_clean['processed_time'] = pd.Timestamp.now()

# Convert to Arrow for efficient storage
arrow_table = pa.Table.from_pandas(df_clean)

# Save intermediate results
client.insert_arrow('cleaned_events', arrow_table)

# Numerical analysis with NumPy
numeric_data = client.query_np(
    'SELECT value, timestamp FROM cleaned_events ORDER BY timestamp'
)

# Time series analysis
values = numeric_data['value']
timestamps = numeric_data['timestamp']

# Moving average calculation
window_size = 100
moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')

# Store results back
results_df = pd.DataFrame({
    'timestamp': timestamps[window_size-1:],
    'original_value': values[window_size-1:],
    'moving_average': moving_avg,
    'deviation': values[window_size-1:] - moving_avg
})

client.insert_df('time_series_analysis', results_df)

Performance Optimization

# Optimize DataFrame queries
df = client.query_df(
    'SELECT * FROM large_table',
    settings={
        'max_threads': 8,
        'max_block_size': 65536,
        'max_memory_usage': '4G'
    },
    use_na_values=True,  # Use pandas NA for better performance
    max_str_len=1000     # Limit string length
)

# Streaming for memory efficiency
total_sum = 0
row_count = 0

for chunk in client.query_np_stream(
    'SELECT numeric_column FROM huge_table',
    settings={'max_block_size': 100000}
):
    total_sum += np.sum(chunk['numeric_column'])
    row_count += len(chunk)

average = total_sum / row_count

# Batch insert for better performance
batch_size = 10000
for i in range(0, len(large_df), batch_size):
    batch = large_df.iloc[i:i+batch_size]
    client.insert_df('target_table', batch)

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-connect

docs

client-api.md

data-formats.md

dbapi.md

exceptions.md

index.md

sqlalchemy.md

utilities.md

tile.json