CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-clickhouse-connect

ClickHouse Database Core Driver for Python, Pandas, and Superset

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

dbapi.mddocs/

DB-API Interface

Standard Python Database API 2.0 implementation providing Connection and Cursor objects for compatibility with existing database tools and frameworks. Enables seamless integration with applications that expect standard database interfaces.

Capabilities

Module Constants

DB-API 2.0 compliance constants defining the interface capabilities and parameter formatting style.

apilevel: str = '2.0'
"""DB-API version level (2.0 compliance)."""

threadsafety: int = 2
"""Thread safety level: threads may share the module and connections."""

paramstyle: str = 'pyformat'
"""Parameter style: Python extended format codes (e.g., %(name)s)."""

Connection Factory

Primary function for creating DB-API compatible connection objects with ClickHouse server configuration.

def connect(
    host: str | None = None,
    database: str | None = None,
    username: str | None = '',
    password: str | None = '',
    port: int | None = None,
    secure: bool = False,
    **kwargs
) -> Connection:
    """
    Create a DB-API 2.0 compatible connection to ClickHouse.
    
    Parameters:
    - host: ClickHouse server hostname (default: localhost)
    - database: Database name (default: default database for user)
    - username: Username for authentication (default: 'default')
    - password: Password for authentication
    - port: Server port (default: 8123 for HTTP, 8443 for HTTPS)
    - secure: Use HTTPS connection
    - **kwargs: Additional connection parameters passed to create_client()
    
    Returns:
    Connection object implementing DB-API 2.0 interface
    
    Example:
    conn = clickhouse_connect.dbapi.connect(
        host='localhost',
        database='analytics',
        username='analyst',
        password='secret'
    )
    """

Base Exception Classes

Exception hierarchy following DB-API 2.0 specification for consistent error handling across database applications.

class Error(Exception):
    """Base class for all database errors."""
    pass

Connection Class

DB-API 2.0 compatible connection object providing transaction management, cursor creation, and connection lifecycle methods.

class Connection:
    """DB-API 2.0 compatible connection to ClickHouse database."""
    
    def close(self):
        """
        Close the connection permanently.
        
        After calling close(), the connection object and any cursor
        objects created from it are unusable.
        """
    
    def commit(self):
        """
        Commit any pending transaction.
        
        Note: ClickHouse does not support transactions, so this is a no-op
        for compatibility. All changes are automatically committed.
        """
    
    def rollback(self):
        """
        Rollback any pending transaction.
        
        Note: ClickHouse does not support transactions, so this is a no-op
        for compatibility. Changes cannot be rolled back.
        """
    
    def cursor(self) -> Cursor:
        """
        Create a new cursor object using the connection.
        
        Returns:
        Cursor object for executing statements and fetching results
        """
    
    def command(self, cmd: str, parameters: dict | None = None) -> Any:
        """
        Execute a command and return the result.
        
        Parameters:
        - cmd: Command string to execute
        - parameters: Optional parameters dictionary
        
        Returns:
        Command result value
        """
    
    def raw_query(
        self,
        query: str,
        parameters: dict | None = None,
        settings: dict | None = None,
        fmt: str = 'Native'
    ) -> bytes:
        """
        Execute raw query and return bytes result.
        
        Parameters:
        - query: SQL query string
        - parameters: Query parameters
        - settings: ClickHouse settings
        - fmt: Output format
        
        Returns:
        Raw bytes response from ClickHouse
        """

Cursor Class

DB-API 2.0 compatible cursor object for executing statements and fetching results with standard interface methods.

class Cursor:
    """DB-API 2.0 compatible cursor for ClickHouse operations."""
    
    # Cursor properties
    @property
    def description(self) -> Sequence[Sequence] | None:
        """
        Column description for the last executed query.
        
        Returns:
        Sequence of (name, type_code, display_size, internal_size,
        precision, scale, null_ok) tuples, or None if no query executed.
        """
    
    @property
    def rowcount(self) -> int:
        """
        Number of rows affected by the last operation.
        
        Returns:
        Row count, or -1 if not available
        """
    
    @property
    def arraysize(self) -> int:
        """
        Default number of rows to fetch at a time with fetchmany().
        
        Default value is 1, can be modified by application.
        """
    
    @arraysize.setter
    def arraysize(self, size: int):
        """Set the arraysize property."""
    
    # Cursor methods
    def close(self):
        """
        Close the cursor permanently.
        
        After calling close(), the cursor object is unusable.
        The cursor is automatically closed when deleted.
        """
    
    def execute(
        self,
        operation: str,
        parameters: dict | Sequence | None = None
    ):
        """
        Execute a database operation (query or command).
        
        Parameters:
        - operation: SQL statement string
        - parameters: Parameters for the operation (dict or sequence)
        
        Supports parameter substitution using %(name)s format for dict
        parameters or positional parameters for sequence parameters.
        
        Example:
        cursor.execute(
            "SELECT * FROM users WHERE age > %(min_age)s",
            {'min_age': 25}
        )
        """
    
    def executemany(
        self,
        operation: str,
        seq_of_parameters: Sequence[dict | Sequence]
    ):
        """
        Execute operation multiple times with different parameters.
        
        Parameters:
        - operation: SQL statement string
        - seq_of_parameters: Sequence of parameter dicts or sequences
        
        Equivalent to calling execute() multiple times but may be
        optimized for batch operations.
        
        Example:
        cursor.executemany(
            "INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)",
            [
                {'name': 'Alice', 'age': 25},
                {'name': 'Bob', 'age': 30}
            ]
        )
        """
    
    def fetchone(self) -> Sequence | None:
        """
        Fetch the next row from the query result.
        
        Returns:
        Single row as sequence, or None if no more rows available
        
        Example:
        cursor.execute("SELECT name, age FROM users")
        row = cursor.fetchone()
        if row:
            name, age = row
            print(f"{name}: {age}")
        """
    
    def fetchmany(self, size: int | None = None) -> Sequence[Sequence]:
        """
        Fetch multiple rows from the query result.
        
        Parameters:
        - size: Number of rows to fetch (defaults to arraysize)
        
        Returns:
        Sequence of rows, each row as a sequence
        
        Example:
        cursor.execute("SELECT * FROM products")
        while True:
            rows = cursor.fetchmany(100)
            if not rows:
                break
            process_batch(rows)
        """
    
    def fetchall(self) -> Sequence[Sequence]:
        """
        Fetch all remaining rows from the query result.
        
        Returns:
        Sequence of all remaining rows, each row as a sequence
        
        Warning: Can consume large amounts of memory for big result sets.
        Consider using fetchmany() for large queries.
        
        Example:
        cursor.execute("SELECT id, name FROM categories")
        all_categories = cursor.fetchall()
        for cat_id, cat_name in all_categories:
            print(f"{cat_id}: {cat_name}")
        """

Usage Examples

Basic DB-API Operations

import clickhouse_connect.dbapi

# Connect to ClickHouse
conn = clickhouse_connect.dbapi.connect(
    host='localhost',
    database='analytics',
    username='analyst'
)

# Create cursor
cursor = conn.cursor()

# Execute query
cursor.execute("SELECT count() FROM events")
row_count = cursor.fetchone()[0]
print(f"Total events: {row_count}")

# Parameterized query
cursor.execute(
    "SELECT event_type, count() FROM events WHERE date >= %(start_date)s GROUP BY event_type",
    {'start_date': '2023-01-01'}
)

# Fetch results
for event_type, count in cursor.fetchall():
    print(f"{event_type}: {count}")

# Clean up
cursor.close()
conn.close()

Transaction-Style Operations

import clickhouse_connect.dbapi

conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()

try:
    # Insert data (auto-committed in ClickHouse)
    cursor.execute(
        "INSERT INTO logs (timestamp, level, message) VALUES (%(ts)s, %(level)s, %(msg)s)",
        {
            'ts': '2023-12-01 10:00:00',
            'level': 'INFO',
            'msg': 'Application started'
        }
    )
    
    # "Commit" (no-op for ClickHouse compatibility)
    conn.commit()
    print(f"Inserted {cursor.rowcount} rows")
    
except Exception as e:
    # "Rollback" (no-op for ClickHouse)
    conn.rollback()
    print(f"Error: {e}")
finally:
    cursor.close()
    conn.close()

Batch Operations

import clickhouse_connect.dbapi

conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()

# Batch insert using executemany
user_data = [
    {'name': 'Alice', 'age': 25, 'city': 'NYC'},
    {'name': 'Bob', 'age': 30, 'city': 'LA'},
    {'name': 'Carol', 'age': 35, 'city': 'Chicago'}
]

cursor.executemany(
    "INSERT INTO users (name, age, city) VALUES (%(name)s, %(age)s, %(city)s)",
    user_data
)

print(f"Inserted {len(user_data)} users")

# Query with result iteration
cursor.execute("SELECT name, age, city FROM users ORDER BY age")

# Process results in batches
cursor.arraysize = 100  # Set batch size for fetchmany()
while True:
    batch = cursor.fetchmany()
    if not batch:
        break
    
    for name, age, city in batch:
        print(f"{name} ({age}) from {city}")

cursor.close()
conn.close()

Integration with Database Tools

# Example with a hypothetical ORM or database tool
import clickhouse_connect.dbapi

def get_connection():
    """Factory function for database connections."""
    return clickhouse_connect.dbapi.connect(
        host='clickhouse.example.com',
        database='production',
        username='app_user',
        password='secure_password'
    )

class DatabaseManager:
    """Example database manager using DB-API interface."""
    
    def __init__(self):
        self.conn = get_connection()
    
    def execute_query(self, sql, params=None):
        """Execute query and return all results."""
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql, params)
            return cursor.fetchall(), cursor.description
        finally:
            cursor.close()
    
    def execute_command(self, sql, params=None):
        """Execute command and return row count."""
        cursor = self.conn.cursor()
        try:
            cursor.execute(sql, params)
            return cursor.rowcount
        finally:
            cursor.close()
    
    def close(self):
        """Close database connection."""
        self.conn.close()

# Usage
db = DatabaseManager()

# Query data
results, description = db.execute_query(
    "SELECT product_id, sum(quantity) as total FROM orders GROUP BY product_id"
)

column_names = [desc[0] for desc in description]
print(f"Columns: {column_names}")

for row in results:
    print(dict(zip(column_names, row)))

db.close()

Error Handling

import clickhouse_connect.dbapi

conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()

try:
    # Invalid query
    cursor.execute("SELECT * FROM non_existent_table")
    
except clickhouse_connect.dbapi.Error as e:
    print(f"Database error: {e}")
    
except Exception as e:
    print(f"Unexpected error: {e}")
    
finally:
    cursor.close()
    conn.close()

Connection Context Manager

import clickhouse_connect.dbapi
from contextlib import contextmanager

@contextmanager
def get_cursor():
    """Context manager for database cursor."""
    conn = clickhouse_connect.dbapi.connect(host='localhost')
    cursor = conn.cursor()
    try:
        yield cursor
    finally:
        cursor.close()
        conn.close()

# Usage with context manager
with get_cursor() as cursor:
    cursor.execute("SELECT database()")
    current_db = cursor.fetchone()[0]
    print(f"Current database: {current_db}")
    
    cursor.execute("SELECT count() FROM system.tables")
    table_count = cursor.fetchone()[0]
    print(f"Tables in system: {table_count}")

Install with Tessl CLI

npx tessl i tessl/pypi-clickhouse-connect

docs

client-api.md

data-formats.md

dbapi.md

exceptions.md

index.md

sqlalchemy.md

utilities.md

tile.json