ClickHouse Database Core Driver for Python, Pandas, and Superset
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Standard Python Database API 2.0 implementation providing Connection and Cursor objects for compatibility with existing database tools and frameworks. Enables seamless integration with applications that expect standard database interfaces.
DB-API 2.0 compliance constants defining the interface capabilities and parameter formatting style.
apilevel: str = '2.0'
"""DB-API version level (2.0 compliance)."""
threadsafety: int = 2
"""Thread safety level: threads may share the module and connections."""
paramstyle: str = 'pyformat'
"""Parameter style: Python extended format codes (e.g., %(name)s)."""Primary function for creating DB-API compatible connection objects with ClickHouse server configuration.
def connect(
host: str | None = None,
database: str | None = None,
username: str | None = '',
password: str | None = '',
port: int | None = None,
secure: bool = False,
**kwargs
) -> Connection:
"""
Create a DB-API 2.0 compatible connection to ClickHouse.
Parameters:
- host: ClickHouse server hostname (default: localhost)
- database: Database name (default: default database for user)
- username: Username for authentication (default: 'default')
- password: Password for authentication
- port: Server port (default: 8123 for HTTP, 8443 for HTTPS)
- secure: Use HTTPS connection
- **kwargs: Additional connection parameters passed to create_client()
Returns:
Connection object implementing DB-API 2.0 interface
Example:
conn = clickhouse_connect.dbapi.connect(
host='localhost',
database='analytics',
username='analyst',
password='secret'
)
"""Exception hierarchy following DB-API 2.0 specification for consistent error handling across database applications.
class Error(Exception):
"""Base class for all database errors."""
passDB-API 2.0 compatible connection object providing transaction management, cursor creation, and connection lifecycle methods.
class Connection:
"""DB-API 2.0 compatible connection to ClickHouse database."""
def close(self):
"""
Close the connection permanently.
After calling close(), the connection object and any cursor
objects created from it are unusable.
"""
def commit(self):
"""
Commit any pending transaction.
Note: ClickHouse does not support transactions, so this is a no-op
for compatibility. All changes are automatically committed.
"""
def rollback(self):
"""
Rollback any pending transaction.
Note: ClickHouse does not support transactions, so this is a no-op
for compatibility. Changes cannot be rolled back.
"""
def cursor(self) -> Cursor:
"""
Create a new cursor object using the connection.
Returns:
Cursor object for executing statements and fetching results
"""
def command(self, cmd: str, parameters: dict | None = None) -> Any:
"""
Execute a command and return the result.
Parameters:
- cmd: Command string to execute
- parameters: Optional parameters dictionary
Returns:
Command result value
"""
def raw_query(
self,
query: str,
parameters: dict | None = None,
settings: dict | None = None,
fmt: str = 'Native'
) -> bytes:
"""
Execute raw query and return bytes result.
Parameters:
- query: SQL query string
- parameters: Query parameters
- settings: ClickHouse settings
- fmt: Output format
Returns:
Raw bytes response from ClickHouse
"""DB-API 2.0 compatible cursor object for executing statements and fetching results with standard interface methods.
class Cursor:
"""DB-API 2.0 compatible cursor for ClickHouse operations."""
# Cursor properties
@property
def description(self) -> Sequence[Sequence] | None:
"""
Column description for the last executed query.
Returns:
Sequence of (name, type_code, display_size, internal_size,
precision, scale, null_ok) tuples, or None if no query executed.
"""
@property
def rowcount(self) -> int:
"""
Number of rows affected by the last operation.
Returns:
Row count, or -1 if not available
"""
@property
def arraysize(self) -> int:
"""
Default number of rows to fetch at a time with fetchmany().
Default value is 1, can be modified by application.
"""
@arraysize.setter
def arraysize(self, size: int):
"""Set the arraysize property."""
# Cursor methods
def close(self):
"""
Close the cursor permanently.
After calling close(), the cursor object is unusable.
The cursor is automatically closed when deleted.
"""
def execute(
self,
operation: str,
parameters: dict | Sequence | None = None
):
"""
Execute a database operation (query or command).
Parameters:
- operation: SQL statement string
- parameters: Parameters for the operation (dict or sequence)
Supports parameter substitution using %(name)s format for dict
parameters or positional parameters for sequence parameters.
Example:
cursor.execute(
"SELECT * FROM users WHERE age > %(min_age)s",
{'min_age': 25}
)
"""
def executemany(
self,
operation: str,
seq_of_parameters: Sequence[dict | Sequence]
):
"""
Execute operation multiple times with different parameters.
Parameters:
- operation: SQL statement string
- seq_of_parameters: Sequence of parameter dicts or sequences
Equivalent to calling execute() multiple times but may be
optimized for batch operations.
Example:
cursor.executemany(
"INSERT INTO users (name, age) VALUES (%(name)s, %(age)s)",
[
{'name': 'Alice', 'age': 25},
{'name': 'Bob', 'age': 30}
]
)
"""
def fetchone(self) -> Sequence | None:
"""
Fetch the next row from the query result.
Returns:
Single row as sequence, or None if no more rows available
Example:
cursor.execute("SELECT name, age FROM users")
row = cursor.fetchone()
if row:
name, age = row
print(f"{name}: {age}")
"""
def fetchmany(self, size: int | None = None) -> Sequence[Sequence]:
"""
Fetch multiple rows from the query result.
Parameters:
- size: Number of rows to fetch (defaults to arraysize)
Returns:
Sequence of rows, each row as a sequence
Example:
cursor.execute("SELECT * FROM products")
while True:
rows = cursor.fetchmany(100)
if not rows:
break
process_batch(rows)
"""
def fetchall(self) -> Sequence[Sequence]:
"""
Fetch all remaining rows from the query result.
Returns:
Sequence of all remaining rows, each row as a sequence
Warning: Can consume large amounts of memory for big result sets.
Consider using fetchmany() for large queries.
Example:
cursor.execute("SELECT id, name FROM categories")
all_categories = cursor.fetchall()
for cat_id, cat_name in all_categories:
print(f"{cat_id}: {cat_name}")
"""import clickhouse_connect.dbapi
# Connect to ClickHouse
conn = clickhouse_connect.dbapi.connect(
host='localhost',
database='analytics',
username='analyst'
)
# Create cursor
cursor = conn.cursor()
# Execute query
cursor.execute("SELECT count() FROM events")
row_count = cursor.fetchone()[0]
print(f"Total events: {row_count}")
# Parameterized query
cursor.execute(
"SELECT event_type, count() FROM events WHERE date >= %(start_date)s GROUP BY event_type",
{'start_date': '2023-01-01'}
)
# Fetch results
for event_type, count in cursor.fetchall():
print(f"{event_type}: {count}")
# Clean up
cursor.close()
conn.close()import clickhouse_connect.dbapi
conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()
try:
# Insert data (auto-committed in ClickHouse)
cursor.execute(
"INSERT INTO logs (timestamp, level, message) VALUES (%(ts)s, %(level)s, %(msg)s)",
{
'ts': '2023-12-01 10:00:00',
'level': 'INFO',
'msg': 'Application started'
}
)
# "Commit" (no-op for ClickHouse compatibility)
conn.commit()
print(f"Inserted {cursor.rowcount} rows")
except Exception as e:
# "Rollback" (no-op for ClickHouse)
conn.rollback()
print(f"Error: {e}")
finally:
cursor.close()
conn.close()import clickhouse_connect.dbapi
conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()
# Batch insert using executemany
user_data = [
{'name': 'Alice', 'age': 25, 'city': 'NYC'},
{'name': 'Bob', 'age': 30, 'city': 'LA'},
{'name': 'Carol', 'age': 35, 'city': 'Chicago'}
]
cursor.executemany(
"INSERT INTO users (name, age, city) VALUES (%(name)s, %(age)s, %(city)s)",
user_data
)
print(f"Inserted {len(user_data)} users")
# Query with result iteration
cursor.execute("SELECT name, age, city FROM users ORDER BY age")
# Process results in batches
cursor.arraysize = 100 # Set batch size for fetchmany()
while True:
batch = cursor.fetchmany()
if not batch:
break
for name, age, city in batch:
print(f"{name} ({age}) from {city}")
cursor.close()
conn.close()# Example with a hypothetical ORM or database tool
import clickhouse_connect.dbapi
def get_connection():
"""Factory function for database connections."""
return clickhouse_connect.dbapi.connect(
host='clickhouse.example.com',
database='production',
username='app_user',
password='secure_password'
)
class DatabaseManager:
"""Example database manager using DB-API interface."""
def __init__(self):
self.conn = get_connection()
def execute_query(self, sql, params=None):
"""Execute query and return all results."""
cursor = self.conn.cursor()
try:
cursor.execute(sql, params)
return cursor.fetchall(), cursor.description
finally:
cursor.close()
def execute_command(self, sql, params=None):
"""Execute command and return row count."""
cursor = self.conn.cursor()
try:
cursor.execute(sql, params)
return cursor.rowcount
finally:
cursor.close()
def close(self):
"""Close database connection."""
self.conn.close()
# Usage
db = DatabaseManager()
# Query data
results, description = db.execute_query(
"SELECT product_id, sum(quantity) as total FROM orders GROUP BY product_id"
)
column_names = [desc[0] for desc in description]
print(f"Columns: {column_names}")
for row in results:
print(dict(zip(column_names, row)))
db.close()import clickhouse_connect.dbapi
conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()
try:
# Invalid query
cursor.execute("SELECT * FROM non_existent_table")
except clickhouse_connect.dbapi.Error as e:
print(f"Database error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
cursor.close()
conn.close()import clickhouse_connect.dbapi
from contextlib import contextmanager
@contextmanager
def get_cursor():
"""Context manager for database cursor."""
conn = clickhouse_connect.dbapi.connect(host='localhost')
cursor = conn.cursor()
try:
yield cursor
finally:
cursor.close()
conn.close()
# Usage with context manager
with get_cursor() as cursor:
cursor.execute("SELECT database()")
current_db = cursor.fetchone()[0]
print(f"Current database: {current_db}")
cursor.execute("SELECT count() FROM system.tables")
table_count = cursor.fetchone()[0]
print(f"Tables in system: {table_count}")Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-connect