CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-connect

ClickHouse Database Core Driver for Python, Pandas, and Superset

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

utilities.mddocs/

Utilities

Development and testing utilities including data generation tools, external data support, configuration management, and helper functions for enhanced developer experience with ClickHouse Connect.

Capabilities

Data Generation Tools

Utilities for generating random test data matching ClickHouse column types for development, testing, and benchmarking purposes.

from clickhouse_connect.tools.datagen import RandomValueDef, random_col_data, random_value_gen

RandomValueDef = NamedTuple('RandomValueDef', [
    ('name', str),
    ('ch_type', str),
    ('nullable', bool),
    ('low_card', bool)
])
"""
Definition for random value generation parameters.

Fields:
- name: Column name
- ch_type: ClickHouse type string  
- nullable: Whether to generate NULL values
- low_card: Use low cardinality for string types
"""

def random_col_data(
    ch_type: str,
    size: int,
    nullable: bool = False
) -> list:
    """
    Generate random data for a ClickHouse column type.
    
    Parameters:
    - ch_type: ClickHouse type string (e.g., 'Int32', 'String', 'DateTime')
    - size: Number of values to generate
    - nullable: Include NULL values in generated data
    
    Returns:
    List of random values matching the specified type
    
    Supported types:
    - Integer types: Int8, Int16, Int32, Int64, UInt8, UInt16, UInt32, UInt64
    - Float types: Float32, Float64, Decimal
    - String types: String, FixedString, LowCardinality(String)
    - Date/time types: Date, DateTime, DateTime64
    - Boolean: Bool
    - Array types: Array(T) for supported inner types
    - Complex types: Tuple, Map, Nested (basic support)
    
    Example:
    data = random_col_data('Int32', 1000, nullable=True)
    # Returns list of 1000 random integers with some NULL values
    """

def random_value_gen(
    ch_type: str,
    nullable: bool = False
) -> Generator:
    """
    Create generator for random values of specified ClickHouse type.
    
    Parameters:
    - ch_type: ClickHouse type string
    - nullable: Include NULL values in generation
    
    Returns:
    Generator yielding random values of the specified type
    
    Example:
    gen = random_value_gen('Float64', nullable=True)
    values = [next(gen) for _ in range(100)]
    """

# Additional data generation functions
def random_float() -> float: ...
def random_float32() -> float: ...
def random_decimal(prec: int, scale: int) -> Decimal: ...
def random_datetime() -> datetime: ...
def random_datetime_tz(timezone: tzinfo) -> datetime: ...
def random_datetime64(prec: int) -> datetime: ...
def random_ascii_str(max_len: int = 200, min_len: int = 0) -> str: ...
def random_utf8_str(max_len: int = 200) -> str: ...
def fixed_len_ascii_str(str_len: int = 200) -> str: ...
def random_ipv6() -> str: ...
def random_tuple(element_types: Sequence[ClickHouseType], col_def) -> tuple: ...
def random_map(key_type, value_type, sz: int, col_def) -> dict: ...
def random_nested(keys: Sequence[str], types: Sequence[ClickHouseType], col_def: RandomValueDef) -> dict: ...

Testing Utilities

Testing framework utilities including table context managers and test data management for comprehensive testing scenarios.

from clickhouse_connect.tools.testing import TableContext

class TableContext:
    """
    Context manager for creating and cleaning up test tables.
    
    Provides automatic table lifecycle management for testing,
    ensuring tables are properly created and cleaned up even
    if tests fail or are interrupted.
    """
    
    def __init__(
        self,
        client,
        table_name: str,
        columns: list[tuple[str, str]],
        engine: str = 'Memory',
        database: str = '',
        cleanup: bool = True
    ):
        """
        Initialize table context for testing.
        
        Parameters:
        - client: ClickHouse client instance
        - table_name: Name of test table to create
        - columns: List of (column_name, column_type) tuples
        - engine: ClickHouse table engine (default: Memory for testing)
        - database: Target database (uses client default if empty)
        - cleanup: Whether to drop table on context exit
        
        Example:
        with TableContext(client, 'test_users', [
            ('id', 'UInt32'),
            ('name', 'String'),
            ('created', 'DateTime')
        ]) as table:
            # Use table for testing
            client.insert(table.full_name, test_data)
            result = client.query(f'SELECT * FROM {table.full_name}')
        # Table automatically dropped here
        """
    
    def __enter__(self) -> 'TableContext':
        """Create table and return context."""
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Clean up table if cleanup enabled."""
    
    @property
    def full_name(self) -> str:
        """Get fully qualified table name (database.table)."""
    
    def insert_data(self, data: list, column_names: list[str] = None):
        """Insert test data into the table."""
    
    def query(self, query: str = None) -> QueryResult:
        """Query the test table."""

External Data Support

Support for external data sources and temporary tables for complex query scenarios and data integration workflows.

from clickhouse_connect.driver.external import ExternalData, ExternalFile

class ExternalFile:
    """
    External file definition for ClickHouse queries.
    
    Allows referencing external data files in queries,
    enabling complex data processing scenarios without
    permanently storing data in ClickHouse.
    """
    
    def __init__(
        self,
        name: str,
        content: bytes | BinaryIO,
        fmt: str = 'TSV',
        types: list[str] = None,
        structure: str = ''
    ):
        """
        Initialize external file definition.
        
        Parameters:
        - name: Logical name for the external file in queries
        - content: File content as bytes or binary stream
        - fmt: Data format (TSV, CSV, JSON, etc.)
        - types: List of ClickHouse type names for columns
        - structure: Column structure string (alternative to types)
        
        Example:
        external_file = ExternalFile(
            name='lookup_data',
            content=csv_data.encode('utf-8'),
            fmt='CSV',
            types=['String', 'Int32', 'Float64']
        )
        """

class ExternalData:
    """
    Container for external data sources in queries.
    
    Manages collection of external files and temporary
    tables that can be referenced in ClickHouse queries.
    """
    
    def __init__(self):
        """Initialize empty external data container."""
    
    def add_file(
        self,
        name: str,
        content: bytes | BinaryIO,
        fmt: str = 'TSV',
        types: list[str] = None,
        structure: str = ''
    ):
        """
        Add external file to the container.
        
        Parameters match ExternalFile constructor.
        """
    
    def add_table(
        self,
        name: str,
        data: list[list],
        column_names: list[str],
        column_types: list[str]
    ):
        """
        Add external table data to the container.
        
        Parameters:
        - name: Logical table name for queries
        - data: Table data as list of rows
        - column_names: Column names
        - column_types: ClickHouse type names for columns
        """
    
    def clear(self):
        """Remove all external data sources."""
    
    @property
    def files(self) -> dict[str, ExternalFile]:
        """Get dictionary of external files by name."""

Common Configuration

Centralized configuration management and common settings for ClickHouse Connect applications.

from clickhouse_connect.common import (
    version,
    build_client_name,
    get_setting,
    set_setting,
    CommonSetting
)

def version() -> str:
    """
    Get ClickHouse Connect package version.
    
    Returns:
    Version string (e.g., '0.8.18')
    """

def build_client_name(client_name: str = '') -> str:
    """
    Build User-Agent string for HTTP requests.
    
    Parameters:
    - client_name: Custom client identifier to prepend
    
    Returns:
    Complete User-Agent string with client name and version info
    
    Example:
    user_agent = build_client_name('MyApp/1.0')
    # Returns: 'MyApp/1.0 clickhouse-connect/0.8.18 (Python/3.9.0)'
    """

def get_setting(key: str) -> Any:
    """
    Get common setting value.
    
    Parameters:
    - key: Setting name
    
    Returns:
    Setting value, or default if not set
    
    Available settings:
    - autogenerate_session_id: Auto-generate session IDs (bool)
    - readonly: Global readonly mode (str)
    - use_protocol_version: Use enhanced protocol features (bool)
    - max_connection_age: Maximum connection age in seconds (int)
    """

def set_setting(key: str, value: Any):
    """
    Set common setting value.
    
    Parameters:
    - key: Setting name
    - value: Setting value
    
    Example:
    set_setting('autogenerate_session_id', True)
    set_setting('max_connection_age', 3600)
    """

class CommonSetting:
    """
    Common setting definition with metadata.
    
    Provides setting definition, validation, and
    default value management for global settings.
    """
    
    def __init__(
        self,
        name: str,
        default_value: Any,
        description: str = '',
        validator: callable = None
    ):
        """
        Initialize common setting definition.
        
        Parameters:
        - name: Setting name
        - default_value: Default value for the setting
        - description: Human-readable description
        - validator: Optional validation function
        """

Helper Functions and Utilities

Miscellaneous utility functions for data conversion, identifier handling, and common operations.

from clickhouse_connect.driver.binding import quote_identifier
from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool
from clickhouse_connect.driver.tzutil import normalize_timezone

def quote_identifier(identifier: str) -> str:
    """
    Quote ClickHouse identifier if needed.
    
    Parameters:
    - identifier: Table name, column name, or other identifier
    
    Returns:
    Properly quoted identifier for safe use in SQL
    
    Example:
    safe_name = quote_identifier('user-data')  # Returns: `user-data`
    safe_name = quote_identifier('normal_name')  # Returns: normal_name
    """

def dict_copy(source: dict, **kwargs) -> dict:
    """
    Create dictionary copy with optional updates.
    
    Parameters:
    - source: Source dictionary to copy
    - **kwargs: Additional key-value pairs to include/override
    
    Returns:
    New dictionary with copied and updated values
    """

def coerce_int(value: Any) -> int:
    """
    Safely convert value to integer.
    
    Parameters:
    - value: Value to convert
    
    Returns:
    Integer value, or 0 if conversion fails
    """

def coerce_bool(value: Any) -> bool:
    """
    Safely convert value to boolean.
    
    Parameters:
    - value: Value to convert
    
    Returns:
    Boolean value using ClickHouse-style conversion rules
    """

def normalize_timezone(tz) -> tuple[tzinfo, bool]:
    """
    Normalize timezone and check DST safety.
    
    Parameters:
    - tz: Timezone object or string
    
    Returns:
    Tuple of (normalized_timezone, is_dst_safe)
    """

Common Settings Management

Global configuration settings that affect client behavior across all instances and operations.

from clickhouse_connect.common import get_setting, set_setting, version

def version() -> str:
    """
    Get ClickHouse Connect package version.
    
    Returns:
    Package version string (e.g., '0.8.18')
    """

def get_setting(name: str) -> Any:
    """
    Get global ClickHouse Connect setting value.
    
    Parameters:
    - name: Setting name
    
    Returns:
    Current setting value or None if not set
    
    Available settings:
    - 'autogenerate_session_id': Auto-generate session IDs (default: True)
    - 'dict_parameter_format': Dict parameter format: 'json'|'map' (default: 'json')
    - 'invalid_setting_action': Invalid setting action: 'send'|'drop'|'error' (default: 'error')
    - 'max_connection_age': Max connection reuse time in seconds (default: 600)
    - 'product_name': Product name for client identification (default: '')
    - 'send_os_user': Include OS user in client identification (default: True)
    - 'use_protocol_version': Use client protocol version (default: True)
    - 'max_error_size': Maximum error message size (default: 1024)
    - 'http_buffer_size': HTTP streaming buffer size (default: 10MB)
    """

def set_setting(name: str, value: Any) -> None:
    """
    Set global ClickHouse Connect setting.
    
    Parameters:
    - name: Setting name (see get_setting for available settings)
    - value: Setting value
    
    Note: Changes affect new client instances created after the setting is changed
    """

def build_client_name(client_name: str = '') -> str:
    """
    Build User-Agent string for HTTP requests.
    
    Parameters:
    - client_name: Custom client name to prepend
    
    Returns:
    Complete User-Agent string with version and system info
    """

Usage Examples

Random Data Generation

import clickhouse_connect
from clickhouse_connect.tools.datagen import random_col_data, random_value_gen

client = clickhouse_connect.create_client(host='localhost')

# Generate test data for different column types
user_ids = random_col_data('UInt32', 1000)
user_names = random_col_data('String', 1000)
signup_dates = random_col_data('DateTime', 1000)
scores = random_col_data('Float64', 1000, nullable=True)

# Combine into test dataset
test_data = list(zip(user_ids, user_names, signup_dates, scores))

# Create test table and insert data
client.command("""
    CREATE TABLE test_users (
        id UInt32,
        name String,
        signup_date DateTime,
        score Nullable(Float64)
    ) ENGINE = Memory
""")

client.insert('test_users', test_data, 
              column_names=['id', 'name', 'signup_date', 'score'])

print(f"Inserted {len(test_data)} test records")

# Query test data
result = client.query("SELECT count(), avg(score) FROM test_users WHERE score IS NOT NULL")
count, avg_score = result.first_row()
print(f"Records: {count}, Average score: {avg_score:.2f}")

# Generate streaming test data
def generate_test_batch(batch_id: int, batch_size: int = 1000):
    """Generate a batch of test data."""
    id_gen = random_value_gen('UInt32')
    name_gen = random_value_gen('String')
    date_gen = random_value_gen('DateTime')
    
    batch = []
    for i in range(batch_size):
        batch.append([
            next(id_gen) + batch_id * batch_size,  # Ensure unique IDs
            f"batch_{batch_id}_{next(name_gen)}",
            next(date_gen),
            next(random_value_gen('Float64', nullable=True))
        ])
    return batch

# Insert multiple batches
for batch_id in range(5):
    batch_data = generate_test_batch(batch_id)
    client.insert('test_users', batch_data,
                  column_names=['id', 'name', 'signup_date', 'score'])

print("Inserted 5 batches of test data")

# Clean up
client.command("DROP TABLE test_users")

Table Context Testing

import clickhouse_connect
from clickhouse_connect.tools.testing import TableContext

client = clickhouse_connect.create_client(host='localhost')

def test_user_operations():
    """Test user operations with automatic table cleanup."""
    
    with TableContext(
        client,
        'test_users',
        [
            ('id', 'UInt32'),
            ('name', 'String'),
            ('email', 'String'),
            ('created_at', 'DateTime')
        ],
        engine='Memory'
    ) as table:
        
        # Insert test data
        test_users = [
            [1, 'Alice', 'alice@example.com', '2023-01-01 10:00:00'],
            [2, 'Bob', 'bob@example.com', '2023-01-02 11:00:00'],
            [3, 'Carol', 'carol@example.com', '2023-01-03 12:00:00']
        ]
        
        table.insert_data(test_users, ['id', 'name', 'email', 'created_at'])
        
        # Test queries
        result = table.query("SELECT count() FROM {}")
        assert result.first_item() == 3, "Should have 3 users"
        
        result = table.query("SELECT name FROM {} WHERE id = 2")
        assert result.first_item() == 'Bob', "User 2 should be Bob"
        
        # Test with WHERE clause
        result = client.query(
            f"SELECT name FROM {table.full_name} WHERE email LIKE '%@example.com'"
        )
        assert len(result.result_set) == 3, "All users have example.com email"
        
        print("All user operation tests passed!")
        
    # Table automatically cleaned up here

def test_with_multiple_tables():
    """Test operations across multiple tables."""
    
    with TableContext(client, 'users', [('id', 'UInt32'), ('name', 'String')]) as users_table, \
         TableContext(client, 'orders', [('user_id', 'UInt32'), ('amount', 'Float64')]) as orders_table:
    
        # Insert related data
        users_table.insert_data([[1, 'Alice'], [2, 'Bob']], ['id', 'name'])
        orders_table.insert_data([[1, 99.99], [1, 149.99], [2, 79.99]], ['user_id', 'amount'])
        
        # Test join query
        result = client.query(f"""
            SELECT u.name, sum(o.amount) as total
            FROM {users_table.full_name} u
            JOIN {orders_table.full_name} o ON u.id = o.user_id
            GROUP BY u.name
            ORDER BY total DESC
        """)
        
        assert len(result.result_set) == 2, "Should have 2 users with orders"
        assert result.result_set[0][1] == 249.98, "Alice should have highest total"
        
        print("Multi-table test passed!")
    
    # Both tables automatically cleaned up

# Run tests
test_user_operations()
test_with_multiple_tables()

External Data Integration

import clickhouse_connect
from clickhouse_connect.driver.external import ExternalData
import csv
import io

client = clickhouse_connect.create_client(host='localhost')

def process_with_external_data():
    """Process data using external files in queries."""
    
    # Create CSV data for lookup
    csv_data = """country_code,country_name,region
US,United States,North America
CA,Canada,North America
GB,United Kingdom,Europe
DE,Germany,Europe
JP,Japan,Asia
"""
    
    # Create external data container
    external_data = ExternalData()
    
    # Add CSV file as external data
    external_data.add_file(
        name='country_lookup',
        content=csv_data.encode('utf-8'),
        fmt='CSV',
        types=['String', 'String', 'String']
    )
    
    # Add programmatic data as external table
    currency_data = [
        ['US', 'USD', 1.0],
        ['CA', 'CAD', 0.75],
        ['GB', 'GBP', 1.25],
        ['DE', 'EUR', 1.10],
        ['JP', 'JPY', 0.007]
    ]
    
    external_data.add_table(
        name='exchange_rates',
        data=currency_data,
        column_names=['country_code', 'currency', 'rate'],
        column_types=['String', 'String', 'Float64']
    )
    
    # Query combining main data with external data
    result = client.query("""
        SELECT 
            c.country_name,
            c.region,
            e.currency,
            e.rate,
            1000 * e.rate as local_amount
        FROM country_lookup c
        JOIN exchange_rates e ON c.country_code = e.country_code
        ORDER BY c.region, c.country_name
    """, external_data=external_data)
    
    print("Country data with exchange rates:")
    for row in result.result_set:
        country, region, currency, rate, local_amount = row
        print(f"{country} ({region}): {currency} - Rate: {rate}, 1000 USD = {local_amount:.2f} {currency}")

def process_large_external_file():
    """Process large external file efficiently."""
    
    # Generate large CSV data
    csv_buffer = io.StringIO()
    writer = csv.writer(csv_buffer)
    writer.writerow(['id', 'category', 'value'])
    
    for i in range(10000):
        writer.writerow([i, f'category_{i % 10}', i * 1.5])
    
    csv_content = csv_buffer.getvalue().encode('utf-8')
    
    external_data = ExternalData()
    external_data.add_file(
        name='large_dataset',
        content=csv_content,
        fmt='CSV',
        types=['UInt32', 'String', 'Float64']
    )
    
    # Process external data with aggregation
    result = client.query("""
        SELECT 
            category,
            count() as record_count,
            avg(value) as avg_value,
            sum(value) as total_value
        FROM large_dataset
        GROUP BY category
        ORDER BY category
    """, external_data=external_data)
    
    print("\nLarge dataset aggregation:")
    for category, count, avg_val, total_val in result.result_set:
        print(f"{category}: {count} records, avg: {avg_val:.2f}, total: {total_val:.2f}")

# Run examples
process_with_external_data()
process_large_external_file()

Configuration Management

import clickhouse_connect
from clickhouse_connect.common import (
    get_setting, set_setting, version, build_client_name
)

# Display package information
print(f"ClickHouse Connect version: {version()}")
print(f"Default User-Agent: {build_client_name()}")
print(f"Custom User-Agent: {build_client_name('MyApp/2.0')}")

# Configure global settings
print("\nConfiguring global settings:")
set_setting('autogenerate_session_id', True)
set_setting('max_connection_age', 7200)  # 2 hours

print(f"Auto-generate session ID: {get_setting('autogenerate_session_id')}")
print(f"Max connection age: {get_setting('max_connection_age')} seconds")

# Create client with global settings applied
client = clickhouse_connect.create_client(
    host='localhost',
    client_name='MyApp/2.0'
)

# Check if session ID was auto-generated
session_info = client.command("SELECT sessionId()")
print(f"Session ID: {session_info}")

# Test connection health
if client.ping():
    print("Connection is healthy")
    
    # Get server info
    server_version = client.command("SELECT version()")
    server_uptime = client.command("SELECT uptime()")
    
    print(f"Server version: {server_version}")
    print(f"Server uptime: {server_uptime} seconds")

client.close()

Utility Functions

import clickhouse_connect
from clickhouse_connect.driver.binding import quote_identifier
from clickhouse_connect.driver.common import dict_copy, coerce_int, coerce_bool

client = clickhouse_connect.create_client(host='localhost')

# Safe identifier quoting
table_names = ['users', 'user-data', 'order_items', 'special@table']
for name in table_names:
    quoted = quote_identifier(name)
    print(f"'{name}' -> {quoted}")

# Build dynamic query with safe identifiers
def build_select_query(table_name: str, columns: list[str], where_clause: str = ''):
    """Build SELECT query with safe identifier quoting."""
    
    safe_table = quote_identifier(table_name)
    safe_columns = [quote_identifier(col) for col in columns]
    
    query = f"SELECT {', '.join(safe_columns)} FROM {safe_table}"
    if where_clause:
        query += f" WHERE {where_clause}"
    
    return query

# Example usage
query = build_select_query(
    'user-data',
    ['user-id', 'full-name', 'created-at'],
    'status = 1'
)
print(f"\nGenerated query: {query}")

# Configuration merging
base_config = {
    'host': 'localhost',
    'port': 8123,
    'compress': 'lz4'
}

# Create variations with dict_copy
dev_config = dict_copy(base_config, database='development', port=8124)
prod_config = dict_copy(base_config, 
                       host='prod.clickhouse.com', 
                       database='production',
                       secure=True)

print(f"\nBase config: {base_config}")
print(f"Dev config: {dev_config}")
print(f"Prod config: {prod_config}")

# Type coercion utilities
test_values = ['123', '0', 'true', 'false', '1', '', None, 42]

print("\nType coercion examples:")
for value in test_values:
    int_val = coerce_int(value)
    bool_val = coerce_bool(value)
    print(f"{repr(value):>8} -> int: {int_val:>3}, bool: {bool_val}")

client.close()

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-connect

docs

client-api.md

data-formats.md

dbapi.md

exceptions.md

index.md

sqlalchemy.md

utilities.md

tile.json