ClickHouse Database Core Driver for Python, Pandas, and Superset
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Integration with the scientific Python ecosystem including NumPy arrays, Pandas DataFrames, and PyArrow tables for high-performance data processing and analysis workflows. Provides seamless conversion between ClickHouse data and popular data science libraries.
Native support for querying data directly into Pandas DataFrames and inserting DataFrame data into ClickHouse tables with automatic type conversion and index handling.
def query_df(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
use_na_values: bool = True,
max_str_len: int = 0,
context: QueryContext | None = None
) -> pd.DataFrame:
"""
Execute query and return results as Pandas DataFrame.
Parameters:
- query: SQL query string
- parameters: Query parameters dictionary
- settings: ClickHouse settings for query execution
- use_na_values: Use pandas NA values for ClickHouse NULLs
- max_str_len: Maximum string column length (0 = unlimited)
- context: Reusable query context
Returns:
pandas.DataFrame with properly typed columns and index
Requires:
pandas package (install with: pip install clickhouse-connect[pandas])
"""
def query_df_stream(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
context: QueryContext | None = None
) -> Generator[pd.DataFrame, None, None]:
"""
Stream query results as DataFrame chunks.
Yields:
pandas.DataFrame objects for each result block
Requires:
pandas package
"""
def insert_df(
self,
table: str,
df: pd.DataFrame,
database: str = '',
settings: dict | None = None,
column_names: Sequence[str] | None = None,
column_type_names: Sequence[str] | None = None
):
"""
Insert Pandas DataFrame into ClickHouse table.
Parameters:
- table: Target table name
- df: Pandas DataFrame to insert
- database: Target database (uses client default if empty)
- settings: ClickHouse settings for insert operation
- column_names: Override DataFrame column names
- column_type_names: Specify ClickHouse types for columns
Features:
- Automatic type conversion from pandas to ClickHouse types
- Handles datetime, categorical, and nullable columns
- Preserves precision for numeric types
- Supports multi-index DataFrames
Requires:
pandas package
"""Direct integration with NumPy for high-performance numerical computations with automatic handling of multidimensional arrays and dtype conversion.
def query_np(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
external_data: ExternalData | None = None,
context: QueryContext | None = None
) -> np.ndarray:
"""
Execute query and return results as NumPy array.
Parameters:
- query: SQL query string
- parameters: Query parameters dictionary
- settings: ClickHouse settings for query execution
- external_data: External data for query processing
- context: Reusable query context
Returns:
numpy.ndarray with appropriate dtype for ClickHouse column types
Features:
- Automatic dtype selection based on ClickHouse types
- Efficient memory layout for numerical operations
- Support for structured arrays (named columns)
- Handles nullable columns with masked arrays
Requires:
numpy package (install with: pip install clickhouse-connect[numpy])
"""
def query_np_stream(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
context: QueryContext | None = None
) -> Generator[np.ndarray, None, None]:
"""
Stream query results as NumPy array chunks.
Yields:
numpy.ndarray objects for each result block
Requires:
numpy package
"""Integration with Apache Arrow for columnar data processing with zero-copy operations and interoperability with Arrow-based tools and file formats.
def query_arrow(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
context: QueryContext | None = None
) -> pa.Table:
"""
Execute query and return results as PyArrow Table.
Parameters:
- query: SQL query string
- parameters: Query parameters dictionary
- settings: ClickHouse settings for query execution
- context: Reusable query context
Returns:
pyarrow.Table with schema matching ClickHouse column types
Features:
- Zero-copy data transfer where possible
- Preserves all ClickHouse type information
- Efficient columnar storage format
- Compatible with Arrow ecosystem tools
Requires:
pyarrow package (install with: pip install clickhouse-connect[arrow])
"""
def query_arrow_stream(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
context: QueryContext | None = None
) -> Generator[pa.Table, None, None]:
"""
Stream query results as PyArrow Table chunks.
Yields:
pyarrow.Table objects for each result block
Requires:
pyarrow package
"""
def insert_arrow(
self,
table: str,
arrow_table: pa.Table,
database: str = '',
settings: dict | None = None
):
"""
Insert PyArrow Table into ClickHouse table.
Parameters:
- table: Target table name
- arrow_table: PyArrow Table to insert
- database: Target database (uses client default if empty)
- settings: ClickHouse settings for insert operation
Features:
- Direct columnar data transfer
- Automatic schema mapping from Arrow to ClickHouse
- Efficient batch processing
- Preserves metadata and type information
Requires:
pyarrow package
"""Advanced Arrow integration supporting batch operations and RecordBatch processing for memory-efficient handling of large datasets.
def to_arrow_batches(
result: QueryResult,
max_block_size: int = 65536
) -> Generator[pa.RecordBatch, None, None]:
"""
Convert QueryResult to Arrow RecordBatch generator.
Parameters:
- result: QueryResult from query execution
- max_block_size: Maximum rows per batch
Yields:
pyarrow.RecordBatch objects with consistent schema
Requires:
pyarrow package
"""
def arrow_buffer(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
context: QueryContext | None = None
) -> BinaryIO:
"""
Execute query and return Arrow IPC buffer.
Parameters:
- query: SQL query string
- parameters: Query parameters dictionary
- settings: ClickHouse settings
- context: Query context
Returns:
Binary stream containing Arrow IPC data
Requires:
pyarrow package
"""Comprehensive type mapping between ClickHouse types and Python data science library types with configurable conversion options.
# ClickHouse to Pandas type mapping
CH_PANDAS_TYPE_MAP = {
'String': 'object',
'Int32': 'int32',
'Int64': 'int64',
'Float32': 'float32',
'Float64': 'float64',
'DateTime': 'datetime64[ns]',
'Date': 'datetime64[ns]',
'Bool': 'bool',
'Nullable(Int32)': 'Int32', # Pandas nullable integer
'Array(String)': 'object'
}
# ClickHouse to NumPy dtype mapping
CH_NUMPY_TYPE_MAP = {
'Int8': np.int8,
'Int16': np.int16,
'Int32': np.int32,
'Int64': np.int64,
'UInt8': np.uint8,
'UInt16': np.uint16,
'UInt32': np.uint32,
'UInt64': np.uint64,
'Float32': np.float32,
'Float64': np.float64,
'String': np.object_,
'DateTime': 'datetime64[s]',
'Date': 'datetime64[D]'
}
# ClickHouse to Arrow type mapping
CH_ARROW_TYPE_MAP = {
'String': pa.string(),
'Int32': pa.int32(),
'Int64': pa.int64(),
'Float64': pa.float64(),
'DateTime': pa.timestamp('s'),
'Date': pa.date32(),
'Array(String)': pa.list_(pa.string()),
'Nullable(Int32)': pa.int32() # Arrow handles nullability natively
}import clickhouse_connect
import pandas as pd
client = clickhouse_connect.create_client(host='localhost')
# Query to DataFrame
df = client.query_df("""
SELECT
user_id,
event_time,
event_type,
value
FROM events
WHERE event_time >= '2023-01-01'
ORDER BY event_time
""")
print(df.dtypes)
print(df.head())
# DataFrame analysis
daily_stats = df.groupby(df['event_time'].dt.date).agg({
'user_id': 'nunique',
'value': ['sum', 'mean', 'count']
})
# Insert processed DataFrame
client.insert_df('daily_stats', daily_stats)
# Streaming large datasets
for chunk_df in client.query_df_stream(
'SELECT * FROM large_events_table',
settings={'max_block_size': 50000}
):
# Process each chunk
processed_chunk = chunk_df.groupby('category').sum()
client.insert_df('processed_events', processed_chunk)import clickhouse_connect
import numpy as np
client = clickhouse_connect.create_client(host='localhost')
# Query to NumPy array
data = client.query_np("""
SELECT
price,
volume,
timestamp
FROM market_data
WHERE symbol = 'AAPL'
ORDER BY timestamp
""")
# Data is returned as structured array
prices = data['price']
volumes = data['volume']
timestamps = data['timestamp']
# Numerical analysis
price_changes = np.diff(prices)
volume_weighted_price = np.average(prices, weights=volumes)
correlation = np.corrcoef(prices[1:], volumes[1:])[0, 1]
print(f"VWAP: {volume_weighted_price}")
print(f"Price-Volume Correlation: {correlation}")
# Streaming numerical data
running_sum = 0
count = 0
for chunk in client.query_np_stream(
'SELECT value FROM sensor_data ORDER BY timestamp',
settings={'max_block_size': 100000}
):
running_sum += np.sum(chunk['value'])
count += len(chunk)
average = running_sum / count
print(f"Overall average: {average}")import clickhouse_connect
import pyarrow as pa
client = clickhouse_connect.create_client(host='localhost')
# Query to Arrow Table
table = client.query_arrow("""
SELECT
customer_id,
product_id,
quantity,
price,
order_date
FROM orders
WHERE order_date >= '2023-01-01'
""")
# Arrow operations
filtered_table = table.filter(
pa.compute.greater(table['quantity'], 10)
)
# Aggregation
summary = filtered_table.group_by(['product_id']).aggregate([
('quantity', 'sum'),
('price', 'mean'),
('customer_id', 'count_distinct')
])
print(f"Schema: {table.schema}")
print(f"Rows: {table.num_rows}")
# Convert to other formats
pandas_df = table.to_pandas()
numpy_dict = table.to_pydict()
# Save to file formats
table.to_parquet('orders_export.parquet')
# Insert Arrow data
new_data = pa.table({
'id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Carol'],
'score': [95.5, 87.2, 92.1]
})
client.insert_arrow('test_scores', new_data)import clickhouse_connect
import pandas as pd
import numpy as np
import pyarrow as pa
client = clickhouse_connect.create_client(host='localhost')
# Start with raw data query
raw_data = client.query('SELECT * FROM raw_events')
# Convert to different formats as needed
df = pd.DataFrame(raw_data.result_set, columns=raw_data.column_names)
# Data cleaning with pandas
df_clean = df.dropna().reset_index(drop=True)
df_clean['processed_time'] = pd.Timestamp.now()
# Convert to Arrow for efficient storage
arrow_table = pa.Table.from_pandas(df_clean)
# Save intermediate results
client.insert_arrow('cleaned_events', arrow_table)
# Numerical analysis with NumPy
numeric_data = client.query_np(
'SELECT value, timestamp FROM cleaned_events ORDER BY timestamp'
)
# Time series analysis
values = numeric_data['value']
timestamps = numeric_data['timestamp']
# Moving average calculation
window_size = 100
moving_avg = np.convolve(values, np.ones(window_size)/window_size, mode='valid')
# Store results back
results_df = pd.DataFrame({
'timestamp': timestamps[window_size-1:],
'original_value': values[window_size-1:],
'moving_average': moving_avg,
'deviation': values[window_size-1:] - moving_avg
})
client.insert_df('time_series_analysis', results_df)# Optimize DataFrame queries
df = client.query_df(
'SELECT * FROM large_table',
settings={
'max_threads': 8,
'max_block_size': 65536,
'max_memory_usage': '4G'
},
use_na_values=True, # Use pandas NA for better performance
max_str_len=1000 # Limit string length
)
# Streaming for memory efficiency
total_sum = 0
row_count = 0
for chunk in client.query_np_stream(
'SELECT numeric_column FROM huge_table',
settings={'max_block_size': 100000}
):
total_sum += np.sum(chunk['numeric_column'])
row_count += len(chunk)
average = total_sum / row_count
# Batch insert for better performance
batch_size = 10000
for i in range(0, len(large_df), batch_size):
batch = large_df.iloc[i:i+batch_size]
client.insert_df('target_table', batch)Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-connect