Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Standards-compliant database connectivity following the Python Database API specification (PEP 249). This interface provides familiar cursor-based database interaction patterns compatible with other Python database drivers, making it easy to integrate ClickHouse into existing database applications.
Create database connections using standard DB API 2.0 parameters with support for both DSN and individual parameter formats.
def connect(dsn=None, host=None, user='default', password='',
port=9000, database='', **kwargs):
"""
Create a new database connection.
Parameters:
- dsn: Data Source Name connection string
- host: ClickHouse server hostname
- user: Username for authentication
- password: Password for authentication
- port: ClickHouse server port (default: 9000)
- database: Default database name
- **kwargs: Additional connection parameters passed to underlying client
Returns:
- Connection: DB API 2.0 connection object
DSN Format:
- clickhouse://[user[:password]@]host[:port][/database][?param=value]
- clickhouses:// for SSL connections
Raises:
- ValueError: If neither dsn nor host is provided
"""DB API 2.0 connection wrapper providing transaction-like interface and cursor factory.
class Connection:
def cursor(self, cursor_factory=None):
"""
Create a new cursor for executing queries.
Parameters:
- cursor_factory: Optional cursor factory function (unused)
Returns:
- Cursor: New cursor instance for query execution
"""
def close(self):
"""Close the connection and free resources."""
def commit(self):
"""
Commit pending transaction (no-op for ClickHouse).
Note: ClickHouse doesn't support transactions, so this is a no-op
for DB API 2.0 compatibility.
"""
def rollback(self):
"""
Rollback pending transaction (no-op for ClickHouse).
Note: ClickHouse doesn't support transactions, so this is a no-op
for DB API 2.0 compatibility.
"""DB API 2.0 cursor for query execution with standard fetch methods and result processing.
class Cursor:
def execute(self, operation, parameters=None):
"""
Execute a single query with optional parameters.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
Parameter Formats:
- Dict: {'param': value} for %(param)s placeholders
- Sequence: [value1, value2] for %s placeholders (pyformat style)
"""
def executemany(self, operation, seq_of_parameters):
"""
Execute query multiple times with different parameter sets.
Parameters:
- operation: SQL query string
- seq_of_parameters: Sequence of parameter sets
Note: Optimized for INSERT operations with multiple value sets
"""
def fetchone(self):
"""
Fetch single row from query results.
Returns:
- Tuple: Single result row, or None if no more rows
"""
def fetchmany(self, size=None):
"""
Fetch multiple rows from query results.
Parameters:
- size: Number of rows to fetch (default: cursor.arraysize)
Returns:
- List[Tuple]: List of result rows (may be fewer than size)
"""
def fetchall(self):
"""
Fetch all remaining rows from query results.
Returns:
- List[Tuple]: All remaining result rows
"""
def close(self):
"""Close cursor and free resources."""
def setinputsizes(self, sizes):
"""
Set input parameter sizes (no-op for compatibility).
Parameters:
- sizes: Parameter size specifications (ignored)
"""
def setoutputsize(self, size, column=None):
"""
Set output column size (no-op for compatibility).
Parameters:
- size: Column size specification (ignored)
- column: Column index (ignored)
"""Access query metadata and result information through standard DB API 2.0 properties.
class Cursor:
@property
def description(self):
"""
Column description information.
Returns:
- List[Tuple]: Column metadata as (name, type_code, display_size,
internal_size, precision, scale, null_ok) tuples
Note: Only name and type_code are meaningful for ClickHouse
"""
@property
def rowcount(self):
"""
Number of rows affected by last operation.
Returns:
- int: Row count for INSERT/UPDATE/DELETE, -1 for SELECT
"""
@property
def columns_with_types(self):
"""
Column names with ClickHouse type information (non-standard).
Returns:
- List[Tuple]: Column information as (name, clickhouse_type) tuples
"""ClickHouse-specific functionality extending standard DB API 2.0 interface.
class Cursor:
def set_stream_results(self, stream_results, max_row_buffer=None):
"""
Enable streaming results for memory-efficient processing.
Parameters:
- stream_results: Enable streaming mode
- max_row_buffer: Maximum rows to buffer (optional)
"""
def set_settings(self, settings):
"""
Set ClickHouse-specific query settings.
Parameters:
- settings: Dictionary of ClickHouse settings
"""
def set_types_check(self, types_check):
"""
Enable strict type checking for parameters.
Parameters:
- types_check: Enable type validation
"""
def set_external_table(self, name, structure, data):
"""
Add external table for query execution.
Parameters:
- name: Table name for use in queries
- structure: List of (column_name, type) tuples
- data: Table data as list of tuples
"""
def set_query_id(self, query_id):
"""
Set unique identifier for query tracking.
Parameters:
- query_id: Unique query identifier string
"""Connection and cursor objects support context manager protocol for automatic resource cleanup.
class Connection:
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and close connection."""
self.close()
class Cursor:
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit context manager and close cursor."""
self.close()Standard module-level constants providing interface metadata.
apilevel = '2.0' # DB API specification level
threadsafety = 2 # Thread safety level (connections may be shared)
paramstyle = 'pyformat' # Parameter placeholder style (%(name)s)from clickhouse_driver import connect
# Connect with individual parameters
conn = connect(host='localhost', user='default', database='mydb')
cursor = conn.cursor()
# Execute simple query
cursor.execute('SELECT version()')
result = cursor.fetchone()
print(result[0])
# Execute query with parameters
cursor.execute('SELECT * FROM users WHERE age > %(min_age)s', {'min_age': 25})
rows = cursor.fetchall()
for row in rows:
print(row)
cursor.close()
conn.close()# Connect using Data Source Name
conn = connect('clickhouse://user:pass@localhost:9000/mydb')
cursor = conn.cursor()
cursor.execute('SELECT count() FROM large_table')
count = cursor.fetchone()[0]
print(f"Table has {count} rows")
cursor.close()
conn.close()# Automatic resource cleanup
with connect(host='localhost') as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM system.tables LIMIT 5')
tables = cursor.fetchall()
for table in tables:
print(table[0]) # table name
# Connection and cursor automatically closedwith connect(host='localhost') as conn:
with conn.cursor() as cursor:
# Create table
cursor.execute('''
CREATE TABLE IF NOT EXISTS test_insert (
id UInt32,
name String,
value Float64
) ENGINE = Memory
''')
# Bulk insert with executemany
data = [
(1, 'Alice', 3.14),
(2, 'Bob', 2.71),
(3, 'Charlie', 1.41)
]
cursor.executemany('INSERT INTO test_insert VALUES', data)
print(f"Inserted {cursor.rowcount} rows")with connect(host='localhost') as conn:
with conn.cursor() as cursor:
# Enable streaming for large result sets
cursor.set_stream_results(True, max_row_buffer=1000)
cursor.execute('SELECT * FROM large_table')
# Process results in chunks
while True:
rows = cursor.fetchmany(100)
if not rows:
break
for row in rows:
process_row(row)with connect(host='localhost') as conn:
with conn.cursor() as cursor:
# Set ClickHouse settings
cursor.set_settings({
'max_memory_usage': 10000000000,
'max_execution_time': 60
})
# Enable type checking
cursor.set_types_check(True)
# Set query ID for tracking
cursor.set_query_id('my_query_123')
# Add external table
cursor.set_external_table(
'temp_data',
[('id', 'UInt32'), ('name', 'String')],
[(1, 'Alice'), (2, 'Bob')]
)
cursor.execute('''
SELECT main.*, temp.name as temp_name
FROM main_table main
JOIN temp_data temp ON main.id = temp.id
''')
# Access column information
columns = cursor.columns_with_types
print("Columns:", columns)
results = cursor.fetchall()
print("Results:", results)from clickhouse_driver import connect
from clickhouse_driver.dbapi.errors import DatabaseError, OperationalError
try:
with connect(host='localhost') as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT invalid_function()')
except OperationalError as e:
print(f"Query execution error: {e}")
except DatabaseError as e:
print(f"Database error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-driver