CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-connect

ClickHouse Database Core Driver for Python, Pandas, and Superset

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

client-api.mddocs/

Client API

Core client classes providing the primary interface for executing queries, inserting data, and managing ClickHouse database connections. Supports both synchronous and asynchronous operations with comprehensive configuration options.

Capabilities

Client Factory Functions

Primary functions for creating client instances with full configuration support including authentication, security, compression, and connection management.

def create_client(
    host: str | None = None,
    username: str | None = None,
    password: str = '',
    access_token: str | None = None,
    database: str = '__default__',
    interface: str | None = None,
    port: int = 0,
    secure: bool | str = False,
    dsn: str | None = None,
    settings: dict[str, Any] | None = None,
    generic_args: dict[str, Any] | None = None,
    compress: bool | str = False,
    query_limit: int = 0,
    connect_timeout: int = 10,
    send_receive_timeout: int = 300,
    client_name: str = '',
    verify: bool = True,
    ca_cert: str = '',
    client_cert: str = '',
    client_cert_key: str = '',
    session_id: str = '',
    pool_mgr = None,
    http_proxy: str = '',
    https_proxy: str = '',
    server_host_name: str = '',
    autogenerate_session_id: bool = None,
    **kwargs
) -> Client:
    """
    Create a synchronous ClickHouse client.
    
    Parameters:
    - host: ClickHouse server hostname/IP (default: localhost)
    - username: ClickHouse username (default: default user)
    - password: Password for username
    - access_token: JWT access token for ClickHouse Cloud
    - database: Default database name
    - interface: 'http' or 'https' (auto-detected from port/secure)
    - port: HTTP/HTTPS port (default: 8123/8443)
    - secure: Use HTTPS/TLS connection
    - dsn: Data Source Name string for connection parameters
    - settings: ClickHouse server settings dictionary
    - compress: Compression method (True/'lz4', 'zstd', 'brotli', 'gzip')
    - query_limit: Default LIMIT on returned rows (0 = no limit)
    - connect_timeout: HTTP connection timeout in seconds
    - send_receive_timeout: Read timeout in seconds
    - client_name: Client identifier for User-Agent header
    - verify: Verify server certificate in HTTPS mode
    - ca_cert: Certificate Authority root certificate file path
    - client_cert: TLS client certificate file path
    - client_cert_key: Private key for client certificate
    - session_id: ClickHouse session identifier
    - autogenerate_session_id: Auto-generate UUID session ID
    
    Returns:
    HttpClient instance implementing the Client interface
    """

async def create_async_client(
    executor_threads: int | None = None,
    **kwargs
) -> AsyncClient:
    """
    Create an asynchronous ClickHouse client.
    
    Parameters:
    - executor_threads: ThreadPoolExecutor max workers (default: 4 + CPU cores)
    - **kwargs: All parameters from create_client()
    
    Returns:
    AsyncClient wrapping a synchronous client with async interface
    """

Base Client Class

Abstract base class defining the complete ClickHouse client interface with query execution, data insertion, and connection management methods.

class Client:
    """Base ClickHouse Connect client interface."""
    
    # Connection management
    def ping(self) -> bool:
        """Test connection to ClickHouse server."""
    
    def close(self):
        """Close client connection."""
    
    def close_connections(self):
        """Close all pooled connections."""
    
    def min_version(self, version_str: str) -> bool:
        """Check if server version meets minimum requirement."""
    
    # Settings management
    def set_client_setting(self, key: str, value: Any):
        """Set ClickHouse client setting."""
    
    def get_client_setting(self, key: str) -> Any:
        """Get ClickHouse client setting value."""
    
    def set_access_token(self, access_token: str):
        """Update JWT access token."""

Query Methods

Comprehensive query execution methods supporting multiple result formats, streaming, and parameter binding for flexible data retrieval.

def query(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    query_formats: dict | None = None,
    column_formats: dict | None = None,
    encoding: str = 'utf8',
    use_none: bool = True,
    max_str_len: int = 0,
    context: QueryContext | None = None,
    stream_context: StreamContext | None = None
) -> QueryResult:
    """
    Execute SELECT query and return results.
    
    Parameters:
    - query: SQL query string with optional parameter placeholders
    - parameters: Dictionary of query parameters
    - settings: ClickHouse settings for this query
    - query_formats: Column-specific format overrides
    - column_formats: Output format specifications
    - encoding: Text encoding for string columns
    - use_none: Return None for NULL values (vs default values)
    - max_str_len: Maximum string length (0 = unlimited)
    - context: Reusable query context
    - stream_context: Stream processing context
    
    Returns:
    QueryResult with result_set, column_names, column_types, and metadata
    """

def command(
    self,
    cmd: str,
    parameters: dict | None = None,
    data: Any = None,
    use_database: bool = True,
    session_id: str = ''
) -> Any:
    """
    Execute command and return single result value.
    
    Parameters:
    - cmd: Command string
    - parameters: Command parameters
    - data: Optional data payload
    - use_database: Include database in command context
    - session_id: Session identifier for command
    
    Returns:
    Single value result from command execution
    """

def raw_query(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    fmt: str = 'Native',
    use_database: bool = True,
    session_id: str = ''
) -> bytes:
    """
    Execute query and return raw bytes result.
    
    Parameters:
    - query: SQL query string
    - parameters: Query parameters
    - settings: ClickHouse settings
    - fmt: ClickHouse output format
    - use_database: Use default database
    - session_id: Session identifier
    
    Returns:
    Raw bytes response from ClickHouse
    """

Streaming Query Methods

High-performance streaming query methods for processing large datasets with configurable batch sizes and multiple output formats.

def query_column_block_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[Sequence[Sequence], None, None]:
    """
    Stream query results as column-oriented blocks.
    
    Yields:
    Column blocks where each block contains columns as sequences
    """

def query_row_block_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[Sequence[Sequence], None, None]:
    """
    Stream query results as row-oriented blocks.
    
    Yields:
    Row blocks where each block contains multiple rows
    """

def query_rows_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    context: QueryContext | None = None
) -> Generator[Sequence, None, None]:
    """
    Stream query results as individual rows.
    
    Yields:
    Individual result rows
    """

def raw_stream(
    self,
    query: str,
    parameters: dict | None = None,
    settings: dict | None = None,
    fmt: str = 'Native',
    chunk_size: int = 8192
) -> Generator[bytes, None, None]:
    """
    Stream raw query results as bytes chunks.
    
    Parameters:
    - chunk_size: Size of each yielded chunk in bytes
    
    Yields:
    Raw bytes chunks from ClickHouse response
    """

Insert Methods

Data insertion methods supporting various Python data structures and formats with comprehensive options for data type handling and batch processing.

def insert(
    self,
    table: str,
    data: Sequence[Sequence] | BinaryIO,
    column_names: Sequence[str] | None = None,
    database: str = '',
    settings: dict | None = None,
    column_types_dict: dict | None = None,
    column_type_names: Sequence[str] | None = None,
    context: InsertContext | None = None,
    stream_context: StreamContext | None = None
):
    """
    Insert data into ClickHouse table.
    
    Parameters:
    - table: Target table name
    - data: Data as sequence of sequences or binary stream
    - column_names: List of column names (inferred if not provided)
    - database: Target database (uses client default if empty)
    - settings: ClickHouse settings for insert
    - column_types_dict: Column name to ClickHouse type mapping
    - column_type_names: Ordered list of ClickHouse type names
    - context: Reusable insert context
    - stream_context: Stream processing context
    """

def raw_insert(
    self,
    table: str,
    column_names: Sequence[str],
    insert_block: bytes | BinaryIO,
    settings: dict | None = None,
    column_types: Sequence[ClickHouseType] | None = None,
    database: str = ''
):
    """
    Insert pre-formatted binary data.
    
    Parameters:
    - table: Target table name
    - column_names: Column names for insert
    - insert_block: Pre-formatted binary data block
    - settings: ClickHouse settings
    - column_types: ClickHouse type objects for columns
    - database: Target database
    """

Context Creation

Factory methods for creating reusable query and insert contexts to optimize repeated operations with similar parameters.

def create_query_context(
    self,
    settings: dict | None = None,
    query_formats: dict | None = None,
    column_formats: dict | None = None,
    encoding: str = 'utf8',
    use_none: bool = True,
    max_str_len: int = 0,
    **kwargs
) -> QueryContext:
    """
    Create reusable query execution context.
    
    Returns:
    QueryContext object for repeated query operations
    """

def create_insert_context(
    self,
    table: str,
    column_names: Sequence[str] | None = None,
    database: str = '',
    settings: dict | None = None,
    column_types_dict: dict | None = None,
    **kwargs
) -> InsertContext:
    """
    Create reusable insert execution context.
    
    Returns:
    InsertContext object for repeated insert operations
    """

HTTP Client Implementation

Concrete HTTP/HTTPS client implementation with connection pooling, compression, and advanced networking features.

class HttpClient(Client):
    """HTTP/HTTPS implementation of ClickHouse client."""
    
    def __init__(
        self,
        interface: str,
        host: str,
        port: int,
        username: str,
        password: str,
        database: str,
        access_token: str = '',
        **kwargs
    ):
        """
        Initialize HTTP client with connection parameters.
        
        Additional HTTP-specific features:
        - Connection pooling via urllib3.PoolManager
        - Request/response compression
        - SSL/TLS certificate validation
        - HTTP proxy support
        - Custom User-Agent headers
        - Session management
        """

Async Client Wrapper

Asynchronous client wrapper providing async/await interface using ThreadPoolExecutor for non-blocking database operations.

class AsyncClient:
    """Asynchronous wrapper for ClickHouse client operations."""
    
    def __init__(
        self,
        client: Client,
        executor_threads: int | None = None
    ):
        """
        Initialize async client wrapper.
        
        Parameters:
        - client: Synchronous client instance to wrap
        - executor_threads: ThreadPoolExecutor max workers
        """
    
    async def query(self, *args, **kwargs) -> QueryResult:
        """Async version of query method."""
    
    async def insert(self, *args, **kwargs):
        """Async version of insert method."""
    
    async def command(self, *args, **kwargs) -> Any:
        """Async version of command method."""
    
    async def ping(self) -> bool:
        """Async version of ping method."""
    
    async def close(self):
        """Async version of close method."""

Usage Examples

Basic Query Operations

import clickhouse_connect

# Create client
client = clickhouse_connect.create_client(host='localhost')

# Simple query
result = client.query('SELECT count() FROM system.tables')
row_count = result.first_item()
print(f"Tables: {row_count}")

# Parameterized query
result = client.query(
    'SELECT name, engine FROM system.tables WHERE database = {db:String}',
    parameters={'db': 'system'}
)

for table_name, engine in result.result_set:
    print(f"{table_name}: {engine}")

# Using command for single values
version = client.command('SELECT version()')
print(f"ClickHouse version: {version}")

Advanced Query Configuration

# Query with custom settings
result = client.query(
    'SELECT * FROM large_table',
    settings={
        'max_threads': 4,
        'max_memory_usage': '2G',
        'max_block_size': 65536
    },
    query_limit=1000
)

# Streaming large results
for row_block in client.query_row_block_stream(
    'SELECT * FROM huge_table',
    settings={'max_block_size': 10000}
):
    process_block(row_block)

Data Insertion

# Insert list data
data = [
    ['Alice', 25, 'Engineer'],
    ['Bob', 30, 'Manager'],
    ['Carol', 35, 'Developer']
]

client.insert(
    'employees',
    data,
    column_names=['name', 'age', 'role']
)

# Insert with type specification
client.insert(
    'metrics',
    data,
    column_names=['timestamp', 'value', 'label'],
    column_type_names=['DateTime', 'Float64', 'String']
)

Async Operations

import asyncio
import clickhouse_connect

async def async_operations():
    # Create async client
    client = await clickhouse_connect.create_async_client(
        host='localhost',
        executor_threads=8
    )
    
    # Async query
    result = await client.query('SELECT count() FROM events')
    print(f"Event count: {result.first_item()}")
    
    # Async insert
    await client.insert('logs', log_data, column_names=['timestamp', 'message'])
    
    # Close connection
    await client.close()

# Run async operations
asyncio.run(async_operations())

Connection Management

# Connection with custom settings
client = clickhouse_connect.create_client(
    host='clickhouse.example.com',
    username='analytics_user',
    password='secure_password',
    database='analytics',
    compress='lz4',
    settings={
        'max_threads': 8,
        'max_memory_usage': '4G'
    }
)

# Test connection
if client.ping():
    print("Connection successful")

# Check server capabilities
if client.min_version('21.8'):
    print("Server supports advanced features")

# Runtime setting changes
client.set_client_setting('send_progress_in_http_headers', 1)

# Clean shutdown
client.close()

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-connect

docs

client-api.md

data-formats.md

dbapi.md

exceptions.md

index.md

sqlalchemy.md

utilities.md

tile.json