Python driver with native interface for ClickHouse database providing high-performance connectivity and comprehensive data type support.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive exception hierarchy providing detailed error information for both ClickHouse-specific operations and standard database errors. The driver includes both native ClickHouse exceptions with server error codes and DB API 2.0 compliant exceptions for standards-based error handling.
Native exceptions specific to ClickHouse operations with detailed error codes and nested exception support.
class Error(Exception):
"""Base exception class for all ClickHouse driver errors."""
class ServerException(Error):
"""
Server-side errors with ClickHouse error codes.
Attributes:
- code: ClickHouse error code (integer)
- message: Error message from server
- nested: Nested exception if applicable
"""
def __init__(self, message, code=None, nested=None):
super().__init__(message)
self.code = code
self.nested = nested
class LogicalError(Error):
"""Logic errors in query construction or execution."""
class UnknownTypeError(Error):
"""Unknown or unsupported ClickHouse data type."""
class ChecksumDoesntMatchError(Error):
"""Data integrity error during transmission."""
class TypeMismatchError(Error):
"""Type conversion or casting errors."""
class UnknownCompressionMethod(Error):
"""Unsupported compression algorithm."""
class TooLargeStringSize(Error):
"""String size exceeds maximum allowed length."""Errors related to network connectivity and connection management.
class NetworkError(Error):
"""Network connectivity errors."""
class SocketTimeoutError(NetworkError):
"""Socket operation timeout."""
class UnexpectedPacketFromServerError(Error):
"""Protocol violation or unexpected server response."""
class UnknownPacketFromServerError(UnexpectedPacketFromServerError):
"""Unknown packet type received from server."""Errors during data parsing, conversion, and processing operations.
class CannotParseUuidError(Error):
"""UUID parsing or format errors."""
class CannotParseDomainError(Error):
"""Domain name parsing errors."""
class PartiallyConsumedQueryError(Error):
"""
Query result not fully consumed.
Raised when attempting new operations while previous
query results are still being processed.
"""ClickHouse server error codes for programmatic error handling.
class ErrorCodes:
"""
ClickHouse server error code constants.
Common error codes:
- SYNTAX_ERROR = 62
- UNKNOWN_TABLE = 60
- UNKNOWN_COLUMN = 47
- MEMORY_LIMIT_EXCEEDED = 241
- TIMEOUT_BEFORE_PROCESSING_CONNECTION = 159
- AUTHENTICATION_FAILED = 516
- DATABASE_ACCESS_DENIED = 271
- TABLE_ALREADY_EXISTS = 57
- COLUMN_ALREADY_EXISTS = 44
"""
# Network and connection errors
NETWORK_ERROR = 210
SOCKET_TIMEOUT = 209
CONNECTION_LOST = 210
# Authentication and authorization
AUTHENTICATION_FAILED = 516
DATABASE_ACCESS_DENIED = 271
READONLY = 164
# Query syntax and logic errors
SYNTAX_ERROR = 62
LOGICAL_ERROR = 170
BAD_ARGUMENTS = 36
# Schema and metadata errors
UNKNOWN_DATABASE = 81
UNKNOWN_TABLE = 60
UNKNOWN_COLUMN = 47
TABLE_ALREADY_EXISTS = 57
COLUMN_ALREADY_EXISTS = 44
# Resource limits
MEMORY_LIMIT_EXCEEDED = 241
TIMEOUT_EXCEEDED = 159
QUERY_WAS_CANCELLED = 394
# Data type and conversion errors
TYPE_MISMATCH = 53
CANNOT_CONVERT_TYPE = 70
CANNOT_PARSE_NUMBER = 27
CANNOT_PARSE_DATE = 38Standard database exceptions following Python DB API 2.0 specification for compatibility with database frameworks.
# Exception hierarchy following DB API 2.0
class Warning(Exception):
"""Warning category for non-fatal issues."""
class Error(Exception):
"""Base class for all database-related errors."""
class InterfaceError(Error):
"""
Database interface errors.
Raised for problems with the database interface rather
than the database itself.
"""
class DatabaseError(Error):
"""
Database-related errors.
Base class for errors related to the database operation.
"""
class DataError(DatabaseError):
"""
Data processing errors.
Problems with the processed data like division by zero,
numeric value out of range, etc.
"""
class OperationalError(DatabaseError):
"""
Operational database errors.
Errors related to database operation, not under user control.
Examples: connection lost, database name not found, etc.
"""
class IntegrityError(DatabaseError):
"""
Database integrity constraint violations.
Relational integrity of the database is affected.
"""
class InternalError(DatabaseError):
"""
Internal database errors.
Database system internal errors.
"""
class ProgrammingError(DatabaseError):
"""
Programming errors.
SQL syntax errors, table not found, wrong parameter count, etc.
"""
class NotSupportedError(DatabaseError):
"""
Unsupported database operations.
Method or database API not supported by the database.
"""from clickhouse_driver import Client
from clickhouse_driver.errors import Error, ServerException, NetworkError
client = Client('localhost')
try:
result = client.execute('SELECT invalid_function()')
except ServerException as e:
print(f"ClickHouse server error: {e}")
print(f"Error code: {e.code}")
if e.nested:
print(f"Nested error: {e.nested}")
except NetworkError as e:
print(f"Network error: {e}")
except Error as e:
print(f"General ClickHouse error: {e}")from clickhouse_driver import Client
from clickhouse_driver.errors import ServerException, ErrorCodes
client = Client('localhost')
try:
client.execute('SELECT * FROM nonexistent_table')
except ServerException as e:
if e.code == ErrorCodes.UNKNOWN_TABLE:
print("Table doesn't exist - creating it...")
client.execute('''
CREATE TABLE nonexistent_table (
id UInt32,
name String
) ENGINE = Memory
''')
elif e.code == ErrorCodes.DATABASE_ACCESS_DENIED:
print("Access denied - check permissions")
elif e.code == ErrorCodes.SYNTAX_ERROR:
print(f"SQL syntax error: {e}")
else:
print(f"Other server error {e.code}: {e}")from clickhouse_driver import connect
from clickhouse_driver.dbapi.errors import (
DatabaseError, OperationalError, ProgrammingError,
DataError, IntegrityError
)
try:
with connect(host='localhost') as conn:
with conn.cursor() as cursor:
cursor.execute('SELECT * FROM invalid_table')
except ProgrammingError as e:
print(f"SQL programming error: {e}")
except OperationalError as e:
print(f"Database operational error: {e}")
except DataError as e:
print(f"Data processing error: {e}")
except IntegrityError as e:
print(f"Data integrity error: {e}")
except DatabaseError as e:
print(f"General database error: {e}")from clickhouse_driver import Client
from clickhouse_driver.errors import NetworkError, SocketTimeoutError
# Robust connection with retry logic
def connect_with_retry(max_retries=3):
for attempt in range(max_retries):
try:
client = Client(
'localhost',
connect_timeout=5,
send_receive_timeout=30
)
# Test connection
client.execute('SELECT 1')
return client
except SocketTimeoutError:
print(f"Connection timeout, attempt {attempt + 1}")
if attempt == max_retries - 1:
raise
except NetworkError as e:
print(f"Network error: {e}, attempt {attempt + 1}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Exponential backoff
client = connect_with_retry()from clickhouse_driver import Client
from clickhouse_driver.errors import TypeMismatchError, UnknownTypeError
client = Client('localhost')
try:
# Enable strict type checking
result = client.execute(
'SELECT * FROM test_table WHERE id = %(id)s',
{'id': 'invalid_integer'}, # String instead of integer
types_check=True
)
except TypeMismatchError as e:
print(f"Type mismatch: {e}")
# Handle by converting types or fixing query
try:
# Query with unknown type in result
result = client.execute('SELECT some_new_clickhouse_type FROM test')
except UnknownTypeError as e:
print(f"Unknown ClickHouse type: {e}")
# May need driver update or custom type handlingfrom clickhouse_driver import Client
from clickhouse_driver.errors import ServerException, ErrorCodes
client = Client('localhost')
try:
# Large query that might exceed memory limits
result = client.execute('''
SELECT huge_column, COUNT(*)
FROM massive_table
GROUP BY huge_column
''', settings={
'max_memory_usage': 1000000000, # 1GB limit
'max_execution_time': 300 # 5 minute timeout
})
except ServerException as e:
if e.code == ErrorCodes.MEMORY_LIMIT_EXCEEDED:
print("Query exceeded memory limit")
# Use streaming or reduce data scope
for block in client.execute_iter(
'SELECT huge_column FROM massive_table LIMIT 1000000'
):
process_block(block)
elif e.code == ErrorCodes.TIMEOUT_EXCEEDED:
print("Query timed out")
# Optimize query or increase timeout
elif e.code == ErrorCodes.QUERY_WAS_CANCELLED:
print("Query was cancelled")
# Handle cancellation gracefullyimport logging
from clickhouse_driver import Client
from clickhouse_driver.errors import Error, ServerException
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def execute_with_logging(client, query, params=None):
try:
logger.info(f"Executing query: {query[:100]}...")
result = client.execute(query, params)
logger.info(f"Query completed successfully, {len(result)} rows")
return result
except ServerException as e:
logger.error(f"ClickHouse server error {e.code}: {e}")
if e.nested:
logger.error(f"Nested error: {e.nested}")
raise
except Error as e:
logger.error(f"ClickHouse driver error: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
# Usage
client = Client('localhost')
try:
result = execute_with_logging(
client,
'SELECT count() FROM large_table WHERE date > %(date)s',
{'date': '2023-01-01'}
)
except Error:
# Error already logged
passfrom clickhouse_driver import Client
from clickhouse_driver.errors import ServerException, ErrorCodes
def query_with_fallback(client, primary_query, fallback_query=None):
"""Execute query with fallback on certain errors."""
try:
return client.execute(primary_query)
except ServerException as e:
if e.code in [ErrorCodes.UNKNOWN_TABLE, ErrorCodes.UNKNOWN_COLUMN]:
if fallback_query:
print(f"Primary query failed, trying fallback: {e}")
return client.execute(fallback_query)
else:
print(f"Schema error, no fallback available: {e}")
raise
else:
# Re-raise other server errors
raise
# Usage with schema evolution
client = Client('localhost')
# Try new schema first, fall back to old schema
result = query_with_fallback(
client,
'SELECT id, name, new_column FROM users', # New schema
'SELECT id, name, NULL as new_column FROM users' # Old schema fallback
)Install with Tessl CLI
npx tessl i tessl/pypi-clickhouse-driver