CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-connect

ClickHouse Database Core Driver for Python, Pandas, and Superset

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sqlalchemy.mddocs/

SQLAlchemy Integration

Complete SQLAlchemy dialect implementation enabling ORM capabilities, query building, and integration with SQL-based tools and frameworks. Provides seamless integration with the SQLAlchemy ecosystem for ClickHouse databases.

Capabilities

Dialect Registration

SQLAlchemy dialect for ClickHouse with automatic registration and connection URL support.

# Dialect constants
dialect_name: str = 'clickhousedb'
"""SQLAlchemy dialect name for ClickHouse."""

ischema_names: dict[str, type]
"""Mapping of ClickHouse type names to SQLAlchemy types."""

class ClickHouseDialect:
    """
    SQLAlchemy dialect for ClickHouse database.
    
    Registered automatically as:
    - clickhousedb.connect
    - clickhousedb
    
    Connection URL format:
    clickhousedb://username:password@host:port/database
    """
    
    name: str = 'clickhousedb'
    supports_statement_cache: bool = True
    supports_native_boolean: bool = True
    supports_native_decimal: bool = True
    supports_unicode_statements: bool = True
    supports_unicode_binds: bool = True
    supports_default_values: bool = False
    supports_sequences: bool = False
    supports_native_enum: bool = True
    
    # Transaction support (ClickHouse auto-commits)
    supports_transactions: bool = False
    autocommit: bool = True
    
    # Schema reflection capabilities
    supports_reflection: bool = True
    supports_views: bool = True
    supports_indexes: bool = False  # ClickHouse uses different indexing

Type System Integration

Comprehensive mapping between ClickHouse data types and SQLAlchemy type system with support for complex types and nullable columns.

# ClickHouse to SQLAlchemy type mappings
from sqlalchemy import types as sqltypes

ischema_names = {
    # Integer types
    'Int8': sqltypes.SmallInteger,
    'Int16': sqltypes.SmallInteger,
    'Int32': sqltypes.Integer,
    'Int64': sqltypes.BigInteger,
    'UInt8': sqltypes.SmallInteger,
    'UInt16': sqltypes.Integer,
    'UInt32': sqltypes.BigInteger,
    'UInt64': sqltypes.Numeric,
    
    # Floating point types
    'Float32': sqltypes.Float,
    'Float64': sqltypes.Float,
    'Decimal': sqltypes.Numeric,
    
    # String types
    'String': sqltypes.String,
    'FixedString': sqltypes.CHAR,
    'LowCardinality(String)': sqltypes.String,
    
    # Date/time types
    'Date': sqltypes.Date,
    'DateTime': sqltypes.DateTime,
    'DateTime64': sqltypes.DateTime,
    
    # Boolean type
    'Bool': sqltypes.Boolean,
    
    # Array and complex types
    'Array': sqltypes.ARRAY,
    'Tuple': sqltypes.JSON,
    'Map': sqltypes.JSON,
    'Nested': sqltypes.JSON,
    
    # Special types
    'UUID': sqltypes.String,
    'IPv4': sqltypes.String,
    'IPv6': sqltypes.String,
    'Enum8': sqltypes.Enum,
    'Enum16': sqltypes.Enum,
    
    # Nullable wrapper
    'Nullable': sqltypes.TypeDecorator,
}

Connection Management

SQLAlchemy engine and connection pool integration with ClickHouse-specific connection handling and configuration options.

def create_connect_args(self, url):
    """
    Build connection arguments from SQLAlchemy URL.
    
    Parameters:
    - url: SQLAlchemy URL object
    
    Returns:
    Tuple of (args, kwargs) for connection creation
    
    URL format examples:
    clickhousedb://user:pass@host:port/database
    clickhousedb+http://user:pass@host:8123/database
    clickhousedb+https://user:pass@host:8443/database?secure=true
    """

def do_connect(self, cparams):
    """
    Create ClickHouse connection using clickhouse-connect.
    
    Parameters:
    - cparams: Connection parameters from create_connect_args()
    
    Returns:
    ClickHouse connection object wrapped for SQLAlchemy
    """

def do_close(self, dbapi_connection):
    """Close ClickHouse connection."""

def do_ping(self, dbapi_connection) -> bool:
    """Test connection health."""

Schema Reflection

Automatic schema discovery and table reflection capabilities for introspecting ClickHouse database structure.

def get_schema_names(self, connection, **kwargs) -> list[str]:
    """
    Get list of schema (database) names.
    
    Returns:
    List of database names available on the server
    """

def get_table_names(self, connection, schema=None, **kwargs) -> list[str]:
    """
    Get list of table names in specified schema.
    
    Parameters:
    - connection: SQLAlchemy connection
    - schema: Database name (None for default)
    
    Returns:
    List of table names in the schema
    """

def get_view_names(self, connection, schema=None, **kwargs) -> list[str]:
    """
    Get list of view names in specified schema.
    
    Parameters:
    - connection: SQLAlchemy connection
    - schema: Database name (None for default)
    
    Returns:
    List of view names in the schema
    """

def get_columns(self, connection, table_name, schema=None, **kwargs) -> list[dict]:
    """
    Get column information for specified table.
    
    Parameters:
    - connection: SQLAlchemy connection
    - table_name: Name of the table
    - schema: Database name (None for default)
    
    Returns:
    List of column dictionaries with keys:
    - name: Column name
    - type: SQLAlchemy type object
    - nullable: Boolean nullable flag
    - default: Default value (if any)
    - comment: Column comment (if any)
    """

def get_pk_constraint(self, connection, table_name, schema=None, **kwargs) -> dict:
    """
    Get primary key constraint information.
    
    Note: ClickHouse doesn't have traditional primary keys,
    returns empty constraint info for compatibility.
    """

def get_foreign_keys(self, connection, table_name, schema=None, **kwargs) -> list:
    """
    Get foreign key constraints.
    
    Note: ClickHouse doesn't support foreign keys,
    returns empty list for compatibility.
    """

def get_indexes(self, connection, table_name, schema=None, **kwargs) -> list:
    """
    Get index information.
    
    Note: ClickHouse uses different indexing mechanisms,
    returns empty list for standard indexes.
    """

DDL Compilation

SQL DDL statement compilation for CREATE TABLE, DROP TABLE, and other schema modification operations.

def visit_create_table(self, create, **kwargs) -> str:
    """
    Compile CREATE TABLE statement for ClickHouse.
    
    Handles ClickHouse-specific table engines, partitioning,
    and other table creation options.
    """

def visit_drop_table(self, drop, **kwargs) -> str:
    """Compile DROP TABLE statement."""

def visit_create_index(self, create, **kwargs) -> str:
    """
    Handle index creation.
    
    Note: Translates to ClickHouse-appropriate indexing where applicable.
    """

Usage Examples

Basic SQLAlchemy Engine

from sqlalchemy import create_engine, text
from sqlalchemy.orm import sessionmaker

# Create engine with connection URL
engine = create_engine('clickhousedb://default@localhost:8123/default')

# Test connection
with engine.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    version = result.scalar()
    print(f"ClickHouse version: {version}")

# Create session factory
Session = sessionmaker(bind=engine)
session = Session()

# Execute raw SQL
result = session.execute(text("""
    SELECT 
        database,
        count() as table_count
    FROM system.tables 
    GROUP BY database
"""))

for row in result:
    print(f"Database {row.database}: {row.table_count} tables")

session.close()

Table Definition and ORM

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class Event(Base):
    """SQLAlchemy model for events table."""
    __tablename__ = 'events'
    
    id = Column(Integer, primary_key=True)
    event_type = Column(String(100), nullable=False)
    user_id = Column(Integer, nullable=False)
    timestamp = Column(DateTime, nullable=False)
    value = Column(Float)
    
    def __repr__(self):
        return f"<Event(id={self.id}, type='{self.event_type}', user={self.user_id})>"

# Create engine and tables
engine = create_engine('clickhousedb://default@localhost:8123/analytics')

# Note: ClickHouse table creation may require additional engine parameters
# This creates the table structure for SQLAlchemy to work with
Base.metadata.create_all(engine)

# Work with ORM
Session = sessionmaker(bind=engine)
session = Session()

# Query using ORM
events = session.query(Event).filter(
    Event.event_type == 'click'
).limit(100).all()

for event in events:
    print(f"{event.timestamp}: {event.event_type} by user {event.user_id}")

session.close()

Schema Reflection

from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.engine import reflection

engine = create_engine('clickhousedb://default@localhost:8123/system')

# Reflect schema information
inspector = reflection.Inspector.from_engine(engine)

# Get database names
schemas = inspector.get_schema_names()
print(f"Available databases: {schemas}")

# Get tables in system database
tables = inspector.get_table_names(schema='system')
print(f"System tables: {len(tables)}")

# Reflect specific table structure
table_columns = inspector.get_columns('tables', schema='system')
print("\nColumns in system.tables:")
for col in table_columns:
    print(f"  {col['name']}: {col['type']} ({'nullable' if col['nullable'] else 'not null'})")

# Use reflected table
metadata = MetaData()
tables_table = Table('tables', metadata, autoload_with=engine, schema='system')

with engine.connect() as conn:
    # Query using reflected table
    result = conn.execute(tables_table.select().where(
        tables_table.c.database == 'default'
    ))
    
    for row in result:
        print(f"Table: {row.name}, Engine: {row.engine}")

Advanced Queries with SQLAlchemy Core

from sqlalchemy import create_engine, text, select, func, and_, or_
from sqlalchemy.sql import column, table

engine = create_engine('clickhousedb://default@localhost:8123/analytics')

# Define table structure for queries (without ORM)
events = table('events',
    column('id'),
    column('event_type'),
    column('user_id'),
    column('timestamp'),
    column('value')
)

with engine.connect() as conn:
    # SQLAlchemy Core query building
    query = select([
        events.c.event_type,
        func.count().label('event_count'),
        func.avg(events.c.value).label('avg_value')
    ]).where(
        and_(
            events.c.timestamp >= '2023-01-01',
            events.c.value.isnot(None)
        )
    ).group_by(events.c.event_type).order_by('event_count DESC')
    
    result = conn.execute(query)
    
    for row in result:
        print(f"{row.event_type}: {row.event_count} events, avg value: {row.avg_value:.2f}")
    
    # Raw SQL with parameters
    complex_query = text("""
        SELECT 
            toYYYYMM(timestamp) as month,
            event_type,
            uniq(user_id) as unique_users,
            count() as total_events
        FROM events 
        WHERE timestamp >= :start_date 
          AND timestamp < :end_date
        GROUP BY month, event_type
        ORDER BY month, total_events DESC
    """)
    
    result = conn.execute(complex_query, {
        'start_date': '2023-01-01',
        'end_date': '2023-07-01'
    })
    
    for row in result:
        print(f"{row.month}: {row.event_type} - {row.unique_users} users, {row.total_events} events")

Connection Pool Configuration

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

# Engine with custom connection pool
engine = create_engine(
    'clickhousedb://analytics_user:password@clickhouse.example.com:8123/analytics',
    poolclass=QueuePool,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,  # Validate connections before use
    pool_recycle=3600,   # Recycle connections after 1 hour
    connect_args={
        'compress': 'lz4',
        'settings': {
            'max_threads': 4,
            'max_memory_usage': '2G'
        }
    }
)

# Connection health check
def check_connection_health():
    try:
        with engine.connect() as conn:
            result = conn.execute(text("SELECT 1"))
            return result.scalar() == 1
    except Exception as e:
        print(f"Connection health check failed: {e}")
        return False

if check_connection_health():
    print("Database connection is healthy")
else:
    print("Database connection issues detected")

Integration with Data Analysis Tools

import pandas as pd
from sqlalchemy import create_engine

# Create engine for pandas integration
engine = create_engine('clickhousedb://default@localhost:8123/analytics')

# Pandas DataFrame from SQLAlchemy query
df = pd.read_sql_query("""
    SELECT 
        user_id,
        event_type,
        COUNT(*) as event_count,
        AVG(value) as avg_value,
        MAX(timestamp) as last_event
    FROM events
    WHERE timestamp >= '2023-01-01'
    GROUP BY user_id, event_type
    ORDER BY event_count DESC
""", engine)

print(f"Loaded {len(df)} rows into DataFrame")
print(df.head())

# Data analysis with pandas
top_users = df.groupby('user_id')['event_count'].sum().nlargest(10)
print(f"\nTop 10 users by event count:")
print(top_users)

# Write DataFrame back to ClickHouse via SQLAlchemy
processed_df = df.groupby('event_type').agg({
    'event_count': 'sum',
    'avg_value': 'mean',
    'user_id': 'nunique'
}).reset_index()

processed_df.columns = ['event_type', 'total_events', 'mean_value', 'unique_users']

# Insert processed data
processed_df.to_sql(
    'event_summary',
    engine,
    if_exists='replace',
    index=False,
    method='multi'  # Batch insert
)

print("Processed data written to event_summary table")

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-connect

docs

client-api.md

data-formats.md

dbapi.md

exceptions.md

index.md

sqlalchemy.md

utilities.md

tile.json