ClickHouse Database Core Driver for Python, Pandas, and Superset
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Complete SQLAlchemy dialect implementation enabling ORM capabilities, query building, and integration with SQL-based tools and frameworks. Provides seamless integration with the SQLAlchemy ecosystem for ClickHouse databases.
SQLAlchemy dialect for ClickHouse with automatic registration and connection URL support.
# Dialect constants
dialect_name: str = 'clickhousedb'
"""SQLAlchemy dialect name for ClickHouse."""
ischema_names: dict[str, type]
"""Mapping of ClickHouse type names to SQLAlchemy types."""
class ClickHouseDialect:
"""
SQLAlchemy dialect for ClickHouse database.
Registered automatically as:
- clickhousedb.connect
- clickhousedb
Connection URL format:
clickhousedb://username:password@host:port/database
"""
name: str = 'clickhousedb'
supports_statement_cache: bool = True
supports_native_boolean: bool = True
supports_native_decimal: bool = True
supports_unicode_statements: bool = True
supports_unicode_binds: bool = True
supports_default_values: bool = False
supports_sequences: bool = False
supports_native_enum: bool = True
# Transaction support (ClickHouse auto-commits)
supports_transactions: bool = False
autocommit: bool = True
# Schema reflection capabilities
supports_reflection: bool = True
supports_views: bool = True
supports_indexes: bool = False # ClickHouse uses different indexingComprehensive mapping between ClickHouse data types and SQLAlchemy type system with support for complex types and nullable columns.
# ClickHouse to SQLAlchemy type mappings
from sqlalchemy import types as sqltypes
ischema_names = {
# Integer types
'Int8': sqltypes.SmallInteger,
'Int16': sqltypes.SmallInteger,
'Int32': sqltypes.Integer,
'Int64': sqltypes.BigInteger,
'UInt8': sqltypes.SmallInteger,
'UInt16': sqltypes.Integer,
'UInt32': sqltypes.BigInteger,
'UInt64': sqltypes.Numeric,
# Floating point types
'Float32': sqltypes.Float,
'Float64': sqltypes.Float,
'Decimal': sqltypes.Numeric,
# String types
'String': sqltypes.String,
'FixedString': sqltypes.CHAR,
'LowCardinality(String)': sqltypes.String,
# Date/time types
'Date': sqltypes.Date,
'DateTime': sqltypes.DateTime,
'DateTime64': sqltypes.DateTime,
# Boolean type
'Bool': sqltypes.Boolean,
# Array and complex types
'Array': sqltypes.ARRAY,
'Tuple': sqltypes.JSON,
'Map': sqltypes.JSON,
'Nested': sqltypes.JSON,
# Special types
'UUID': sqltypes.String,
'IPv4': sqltypes.String,
'IPv6': sqltypes.String,
'Enum8': sqltypes.Enum,
'Enum16': sqltypes.Enum,
# Nullable wrapper
'Nullable': sqltypes.TypeDecorator,
}SQLAlchemy engine and connection pool integration with ClickHouse-specific connection handling and configuration options.
def create_connect_args(self, url):
"""
Build connection arguments from SQLAlchemy URL.
Parameters:
- url: SQLAlchemy URL object
Returns:
Tuple of (args, kwargs) for connection creation
URL format examples:
clickhousedb://user:pass@host:port/database
clickhousedb+http://user:pass@host:8123/database
clickhousedb+https://user:pass@host:8443/database?secure=true
"""
def do_connect(self, cparams):
"""
Create ClickHouse connection using clickhouse-connect.
Parameters:
- cparams: Connection parameters from create_connect_args()
Returns:
ClickHouse connection object wrapped for SQLAlchemy
"""
def do_close(self, dbapi_connection):
"""Close ClickHouse connection."""
def do_ping(self, dbapi_connection) -> bool:
"""Test connection health."""Automatic schema discovery and table reflection capabilities for introspecting ClickHouse database structure.
def get_schema_names(self, connection, **kwargs) -> list[str]:
"""
Get list of schema (database) names.
Returns:
List of database names available on the server
"""
def get_table_names(self, connection, schema=None, **kwargs) -> list[str]:
"""
Get list of table names in specified schema.
Parameters:
- connection: SQLAlchemy connection
- schema: Database name (None for default)
Returns:
List of table names in the schema
"""
def get_view_names(self, connection, schema=None, **kwargs) -> list[str]:
"""
Get list of view names in specified schema.
Parameters:
- connection: SQLAlchemy connection
- schema: Database name (None for default)
Returns:
List of view names in the schema
"""
def get_columns(self, connection, table_name, schema=None, **kwargs) -> list[dict]:
"""
Get column information for specified table.
Parameters:
- connection: SQLAlchemy connection
- table_name: Name of the table
- schema: Database name (None for default)
Returns:
List of column dictionaries with keys:
- name: Column name
- type: SQLAlchemy type object
- nullable: Boolean nullable flag
- default: Default value (if any)
- comment: Column comment (if any)
"""
def get_pk_constraint(self, connection, table_name, schema=None, **kwargs) -> dict:
"""
Get primary key constraint information.
Note: ClickHouse doesn't have traditional primary keys,
returns empty constraint info for compatibility.
"""
def get_foreign_keys(self, connection, table_name, schema=None, **kwargs) -> list:
"""
Get foreign key constraints.
Note: ClickHouse doesn't support foreign keys,
returns empty list for compatibility.
"""
def get_indexes(self, connection, table_name, schema=None, **kwargs) -> list:
"""
Get index information.
Note: ClickHouse uses different indexing mechanisms,
returns empty list for standard indexes.
"""SQL DDL statement compilation for CREATE TABLE, DROP TABLE, and other schema modification operations.
def visit_create_table(self, create, **kwargs) -> str:
"""
Compile CREATE TABLE statement for ClickHouse.
Handles ClickHouse-specific table engines, partitioning,
and other table creation options.
"""
def visit_drop_table(self, drop, **kwargs) -> str:
"""Compile DROP TABLE statement."""
def visit_create_index(self, create, **kwargs) -> str:
"""
Handle index creation.
Note: Translates to ClickHouse-appropriate indexing where applicable.
"""from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker
# Create engine with connection URL
engine = create_engine('clickhousedb://default@localhost:8123/default')
# Test connection
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
version = result.scalar()
print(f"ClickHouse version: {version}")
# Create session factory
Session = sessionmaker(bind=engine)
session = Session()
# Execute raw SQL
result = session.execute(text("""
SELECT
database,
count() as table_count
FROM system.tables
GROUP BY database
"""))
for row in result:
print(f"Database {row.database}: {row.table_count} tables")
session.close()from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Event(Base):
"""SQLAlchemy model for events table."""
__tablename__ = 'events'
id = Column(Integer, primary_key=True)
event_type = Column(String(100), nullable=False)
user_id = Column(Integer, nullable=False)
timestamp = Column(DateTime, nullable=False)
value = Column(Float)
def __repr__(self):
return f"<Event(id={self.id}, type='{self.event_type}', user={self.user_id})>"
# Create engine and tables
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
# Note: ClickHouse table creation may require additional engine parameters
# This creates the table structure for SQLAlchemy to work with
Base.metadata.create_all(engine)
# Work with ORM
Session = sessionmaker(bind=engine)
session = Session()
# Query using ORM
events = session.query(Event).filter(
Event.event_type == 'click'
).limit(100).all()
for event in events:
print(f"{event.timestamp}: {event.event_type} by user {event.user_id}")
session.close()from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.engine import reflection
engine = create_engine('clickhousedb://default@localhost:8123/system')
# Reflect schema information
inspector = reflection.Inspector.from_engine(engine)
# Get database names
schemas = inspector.get_schema_names()
print(f"Available databases: {schemas}")
# Get tables in system database
tables = inspector.get_table_names(schema='system')
print(f"System tables: {len(tables)}")
# Reflect specific table structure
table_columns = inspector.get_columns('tables', schema='system')
print("\nColumns in system.tables:")
for col in table_columns:
print(f" {col['name']}: {col['type']} ({'nullable' if col['nullable'] else 'not null'})")
# Use reflected table
metadata = MetaData()
tables_table = Table('tables', metadata, autoload_with=engine, schema='system')
with engine.connect() as conn:
# Query using reflected table
result = conn.execute(tables_table.select().where(
tables_table.c.database == 'default'
))
for row in result:
print(f"Table: {row.name}, Engine: {row.engine}")from sqlalchemy import create_engine, text, select, func, and_, or_
from sqlalchemy.sql import column, table
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
# Define table structure for queries (without ORM)
events = table('events',
column('id'),
column('event_type'),
column('user_id'),
column('timestamp'),
column('value')
)
with engine.connect() as conn:
# SQLAlchemy Core query building
query = select([
events.c.event_type,
func.count().label('event_count'),
func.avg(events.c.value).label('avg_value')
]).where(
and_(
events.c.timestamp >= '2023-01-01',
events.c.value.isnot(None)
)
).group_by(events.c.event_type).order_by('event_count DESC')
result = conn.execute(query)
for row in result:
print(f"{row.event_type}: {row.event_count} events, avg value: {row.avg_value:.2f}")
# Raw SQL with parameters
complex_query = text("""
SELECT
toYYYYMM(timestamp) as month,
event_type,
uniq(user_id) as unique_users,
count() as total_events
FROM events
WHERE timestamp >= :start_date
AND timestamp < :end_date
GROUP BY month, event_type
ORDER BY month, total_events DESC
""")
result = conn.execute(complex_query, {
'start_date': '2023-01-01',
'end_date': '2023-07-01'
})
for row in result:
print(f"{row.month}: {row.event_type} - {row.unique_users} users, {row.total_events} events")from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# Engine with custom connection pool
engine = create_engine(
'clickhousedb://analytics_user:password@clickhouse.example.com:8123/analytics',
poolclass=QueuePool,
pool_size=10,
max_overflow=20,
pool_pre_ping=True, # Validate connections before use
pool_recycle=3600, # Recycle connections after 1 hour
connect_args={
'compress': 'lz4',
'settings': {
'max_threads': 4,
'max_memory_usage': '2G'
}
}
)
# Connection health check
def check_connection_health():
try:
with engine.connect() as conn:
result = conn.execute(text("SELECT 1"))
return result.scalar() == 1
except Exception as e:
print(f"Connection health check failed: {e}")
return False
if check_connection_health():
print("Database connection is healthy")
else:
print("Database connection issues detected")import pandas as pd
from sqlalchemy import create_engine
# Create engine for pandas integration
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
# Pandas DataFrame from SQLAlchemy query
df = pd.read_sql_query("""
SELECT
user_id,
event_type,
COUNT(*) as event_count,
AVG(value) as avg_value,
MAX(timestamp) as last_event
FROM events
WHERE timestamp >= '2023-01-01'
GROUP BY user_id, event_type
ORDER BY event_count DESC
""", engine)
print(f"Loaded {len(df)} rows into DataFrame")
print(df.head())
# Data analysis with pandas
top_users = df.groupby('user_id')['event_count'].sum().nlargest(10)
print(f"\nTop 10 users by event count:")
print(top_users)
# Write DataFrame back to ClickHouse via SQLAlchemy
processed_df = df.groupby('event_type').agg({
'event_count': 'sum',
'avg_value': 'mean',
'user_id': 'nunique'
}).reset_index()
processed_df.columns = ['event_type', 'total_events', 'mean_value', 'unique_users']
# Insert processed data
processed_df.to_sql(
'event_summary',
engine,
if_exists='replace',
index=False,
method='multi' # Batch insert
)
print("Processed data written to event_summary table")Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-connect