0
# SQLAlchemy Integration
1
2
Complete SQLAlchemy dialect implementation enabling ORM capabilities, query building, and integration with SQL-based tools and frameworks. Provides seamless integration with the SQLAlchemy ecosystem for ClickHouse databases.
3
4
## Capabilities
5
6
### Dialect Registration
7
8
SQLAlchemy dialect for ClickHouse with automatic registration and connection URL support.
9
10
```python { .api }
11
# Dialect constants
12
dialect_name: str = 'clickhousedb'
13
"""SQLAlchemy dialect name for ClickHouse."""
14
15
ischema_names: dict[str, type]
16
"""Mapping of ClickHouse type names to SQLAlchemy types."""
17
18
class ClickHouseDialect:
19
"""
20
SQLAlchemy dialect for ClickHouse database.
21
22
Registered automatically as:
23
- clickhousedb.connect
24
- clickhousedb
25
26
Connection URL format:
27
clickhousedb://username:password@host:port/database
28
"""
29
30
name: str = 'clickhousedb'
31
supports_statement_cache: bool = True
32
supports_native_boolean: bool = True
33
supports_native_decimal: bool = True
34
supports_unicode_statements: bool = True
35
supports_unicode_binds: bool = True
36
supports_default_values: bool = False
37
supports_sequences: bool = False
38
supports_native_enum: bool = True
39
40
# Transaction support (ClickHouse auto-commits)
41
supports_transactions: bool = False
42
autocommit: bool = True
43
44
# Schema reflection capabilities
45
supports_reflection: bool = True
46
supports_views: bool = True
47
supports_indexes: bool = False # ClickHouse uses different indexing
48
```
49
50
### Type System Integration
51
52
Comprehensive mapping between ClickHouse data types and SQLAlchemy type system with support for complex types and nullable columns.
53
54
```python { .api }
55
# ClickHouse to SQLAlchemy type mappings
56
from sqlalchemy import types as sqltypes
57
58
ischema_names = {
59
# Integer types
60
'Int8': sqltypes.SmallInteger,
61
'Int16': sqltypes.SmallInteger,
62
'Int32': sqltypes.Integer,
63
'Int64': sqltypes.BigInteger,
64
'UInt8': sqltypes.SmallInteger,
65
'UInt16': sqltypes.Integer,
66
'UInt32': sqltypes.BigInteger,
67
'UInt64': sqltypes.Numeric,
68
69
# Floating point types
70
'Float32': sqltypes.Float,
71
'Float64': sqltypes.Float,
72
'Decimal': sqltypes.Numeric,
73
74
# String types
75
'String': sqltypes.String,
76
'FixedString': sqltypes.CHAR,
77
'LowCardinality(String)': sqltypes.String,
78
79
# Date/time types
80
'Date': sqltypes.Date,
81
'DateTime': sqltypes.DateTime,
82
'DateTime64': sqltypes.DateTime,
83
84
# Boolean type
85
'Bool': sqltypes.Boolean,
86
87
# Array and complex types
88
'Array': sqltypes.ARRAY,
89
'Tuple': sqltypes.JSON,
90
'Map': sqltypes.JSON,
91
'Nested': sqltypes.JSON,
92
93
# Special types
94
'UUID': sqltypes.String,
95
'IPv4': sqltypes.String,
96
'IPv6': sqltypes.String,
97
'Enum8': sqltypes.Enum,
98
'Enum16': sqltypes.Enum,
99
100
# Nullable wrapper
101
'Nullable': sqltypes.TypeDecorator,
102
}
103
```
104
105
### Connection Management
106
107
SQLAlchemy engine and connection pool integration with ClickHouse-specific connection handling and configuration options.
108
109
```python { .api }
110
def create_connect_args(self, url):
111
"""
112
Build connection arguments from SQLAlchemy URL.
113
114
Parameters:
115
- url: SQLAlchemy URL object
116
117
Returns:
118
Tuple of (args, kwargs) for connection creation
119
120
URL format examples:
121
clickhousedb://user:pass@host:port/database
122
clickhousedb+http://user:pass@host:8123/database
123
clickhousedb+https://user:pass@host:8443/database?secure=true
124
"""
125
126
def do_connect(self, cparams):
127
"""
128
Create ClickHouse connection using clickhouse-connect.
129
130
Parameters:
131
- cparams: Connection parameters from create_connect_args()
132
133
Returns:
134
ClickHouse connection object wrapped for SQLAlchemy
135
"""
136
137
def do_close(self, dbapi_connection):
138
"""Close ClickHouse connection."""
139
140
def do_ping(self, dbapi_connection) -> bool:
141
"""Test connection health."""
142
```
143
144
### Schema Reflection
145
146
Automatic schema discovery and table reflection capabilities for introspecting ClickHouse database structure.
147
148
```python { .api }
149
def get_schema_names(self, connection, **kwargs) -> list[str]:
150
"""
151
Get list of schema (database) names.
152
153
Returns:
154
List of database names available on the server
155
"""
156
157
def get_table_names(self, connection, schema=None, **kwargs) -> list[str]:
158
"""
159
Get list of table names in specified schema.
160
161
Parameters:
162
- connection: SQLAlchemy connection
163
- schema: Database name (None for default)
164
165
Returns:
166
List of table names in the schema
167
"""
168
169
def get_view_names(self, connection, schema=None, **kwargs) -> list[str]:
170
"""
171
Get list of view names in specified schema.
172
173
Parameters:
174
- connection: SQLAlchemy connection
175
- schema: Database name (None for default)
176
177
Returns:
178
List of view names in the schema
179
"""
180
181
def get_columns(self, connection, table_name, schema=None, **kwargs) -> list[dict]:
182
"""
183
Get column information for specified table.
184
185
Parameters:
186
- connection: SQLAlchemy connection
187
- table_name: Name of the table
188
- schema: Database name (None for default)
189
190
Returns:
191
List of column dictionaries with keys:
192
- name: Column name
193
- type: SQLAlchemy type object
194
- nullable: Boolean nullable flag
195
- default: Default value (if any)
196
- comment: Column comment (if any)
197
"""
198
199
def get_pk_constraint(self, connection, table_name, schema=None, **kwargs) -> dict:
200
"""
201
Get primary key constraint information.
202
203
Note: ClickHouse doesn't have traditional primary keys,
204
returns empty constraint info for compatibility.
205
"""
206
207
def get_foreign_keys(self, connection, table_name, schema=None, **kwargs) -> list:
208
"""
209
Get foreign key constraints.
210
211
Note: ClickHouse doesn't support foreign keys,
212
returns empty list for compatibility.
213
"""
214
215
def get_indexes(self, connection, table_name, schema=None, **kwargs) -> list:
216
"""
217
Get index information.
218
219
Note: ClickHouse uses different indexing mechanisms,
220
returns empty list for standard indexes.
221
"""
222
```
223
224
### DDL Compilation
225
226
SQL DDL statement compilation for CREATE TABLE, DROP TABLE, and other schema modification operations.
227
228
```python { .api }
229
def visit_create_table(self, create, **kwargs) -> str:
230
"""
231
Compile CREATE TABLE statement for ClickHouse.
232
233
Handles ClickHouse-specific table engines, partitioning,
234
and other table creation options.
235
"""
236
237
def visit_drop_table(self, drop, **kwargs) -> str:
238
"""Compile DROP TABLE statement."""
239
240
def visit_create_index(self, create, **kwargs) -> str:
241
"""
242
Handle index creation.
243
244
Note: Translates to ClickHouse-appropriate indexing where applicable.
245
"""
246
```
247
248
## Usage Examples
249
250
### Basic SQLAlchemy Engine
251
252
```python
253
from sqlalchemy import create_engine, text
254
from sqlalchemy.orm import sessionmaker
255
256
# Create engine with connection URL
257
engine = create_engine('clickhousedb://default@localhost:8123/default')
258
259
# Test connection
260
with engine.connect() as conn:
261
result = conn.execute(text("SELECT version()"))
262
version = result.scalar()
263
print(f"ClickHouse version: {version}")
264
265
# Create session factory
266
Session = sessionmaker(bind=engine)
267
session = Session()
268
269
# Execute raw SQL
270
result = session.execute(text("""
271
SELECT
272
database,
273
count() as table_count
274
FROM system.tables
275
GROUP BY database
276
"""))
277
278
for row in result:
279
print(f"Database {row.database}: {row.table_count} tables")
280
281
session.close()
282
```
283
284
### Table Definition and ORM
285
286
```python
287
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
288
from sqlalchemy.ext.declarative import declarative_base
289
from sqlalchemy.orm import sessionmaker
290
291
Base = declarative_base()
292
293
class Event(Base):
294
"""SQLAlchemy model for events table."""
295
__tablename__ = 'events'
296
297
id = Column(Integer, primary_key=True)
298
event_type = Column(String(100), nullable=False)
299
user_id = Column(Integer, nullable=False)
300
timestamp = Column(DateTime, nullable=False)
301
value = Column(Float)
302
303
def __repr__(self):
304
return f"<Event(id={self.id}, type='{self.event_type}', user={self.user_id})>"
305
306
# Create engine and tables
307
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
308
309
# Note: ClickHouse table creation may require additional engine parameters
310
# This creates the table structure for SQLAlchemy to work with
311
Base.metadata.create_all(engine)
312
313
# Work with ORM
314
Session = sessionmaker(bind=engine)
315
session = Session()
316
317
# Query using ORM
318
events = session.query(Event).filter(
319
Event.event_type == 'click'
320
).limit(100).all()
321
322
for event in events:
323
print(f"{event.timestamp}: {event.event_type} by user {event.user_id}")
324
325
session.close()
326
```
327
328
### Schema Reflection
329
330
```python
331
from sqlalchemy import create_engine, MetaData, Table
332
from sqlalchemy.engine import reflection
333
334
engine = create_engine('clickhousedb://default@localhost:8123/system')
335
336
# Reflect schema information
337
inspector = reflection.Inspector.from_engine(engine)
338
339
# Get database names
340
schemas = inspector.get_schema_names()
341
print(f"Available databases: {schemas}")
342
343
# Get tables in system database
344
tables = inspector.get_table_names(schema='system')
345
print(f"System tables: {len(tables)}")
346
347
# Reflect specific table structure
348
table_columns = inspector.get_columns('tables', schema='system')
349
print("\nColumns in system.tables:")
350
for col in table_columns:
351
print(f" {col['name']}: {col['type']} ({'nullable' if col['nullable'] else 'not null'})")
352
353
# Use reflected table
354
metadata = MetaData()
355
tables_table = Table('tables', metadata, autoload_with=engine, schema='system')
356
357
with engine.connect() as conn:
358
# Query using reflected table
359
result = conn.execute(tables_table.select().where(
360
tables_table.c.database == 'default'
361
))
362
363
for row in result:
364
print(f"Table: {row.name}, Engine: {row.engine}")
365
```
366
367
### Advanced Queries with SQLAlchemy Core
368
369
```python
370
from sqlalchemy import create_engine, text, select, func, and_, or_
371
from sqlalchemy.sql import column, table
372
373
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
374
375
# Define table structure for queries (without ORM)
376
events = table('events',
377
column('id'),
378
column('event_type'),
379
column('user_id'),
380
column('timestamp'),
381
column('value')
382
)
383
384
with engine.connect() as conn:
385
# SQLAlchemy Core query building
386
query = select([
387
events.c.event_type,
388
func.count().label('event_count'),
389
func.avg(events.c.value).label('avg_value')
390
]).where(
391
and_(
392
events.c.timestamp >= '2023-01-01',
393
events.c.value.isnot(None)
394
)
395
).group_by(events.c.event_type).order_by('event_count DESC')
396
397
result = conn.execute(query)
398
399
for row in result:
400
print(f"{row.event_type}: {row.event_count} events, avg value: {row.avg_value:.2f}")
401
402
# Raw SQL with parameters
403
complex_query = text("""
404
SELECT
405
toYYYYMM(timestamp) as month,
406
event_type,
407
uniq(user_id) as unique_users,
408
count() as total_events
409
FROM events
410
WHERE timestamp >= :start_date
411
AND timestamp < :end_date
412
GROUP BY month, event_type
413
ORDER BY month, total_events DESC
414
""")
415
416
result = conn.execute(complex_query, {
417
'start_date': '2023-01-01',
418
'end_date': '2023-07-01'
419
})
420
421
for row in result:
422
print(f"{row.month}: {row.event_type} - {row.unique_users} users, {row.total_events} events")
423
```
424
425
### Connection Pool Configuration
426
427
```python
428
from sqlalchemy import create_engine
429
from sqlalchemy.pool import QueuePool
430
431
# Engine with custom connection pool
432
engine = create_engine(
433
'clickhousedb://analytics_user:password@clickhouse.example.com:8123/analytics',
434
poolclass=QueuePool,
435
pool_size=10,
436
max_overflow=20,
437
pool_pre_ping=True, # Validate connections before use
438
pool_recycle=3600, # Recycle connections after 1 hour
439
connect_args={
440
'compress': 'lz4',
441
'settings': {
442
'max_threads': 4,
443
'max_memory_usage': '2G'
444
}
445
}
446
)
447
448
# Connection health check
449
def check_connection_health():
450
try:
451
with engine.connect() as conn:
452
result = conn.execute(text("SELECT 1"))
453
return result.scalar() == 1
454
except Exception as e:
455
print(f"Connection health check failed: {e}")
456
return False
457
458
if check_connection_health():
459
print("Database connection is healthy")
460
else:
461
print("Database connection issues detected")
462
```
463
464
### Integration with Data Analysis Tools
465
466
```python
467
import pandas as pd
468
from sqlalchemy import create_engine
469
470
# Create engine for pandas integration
471
engine = create_engine('clickhousedb://default@localhost:8123/analytics')
472
473
# Pandas DataFrame from SQLAlchemy query
474
df = pd.read_sql_query("""
475
SELECT
476
user_id,
477
event_type,
478
COUNT(*) as event_count,
479
AVG(value) as avg_value,
480
MAX(timestamp) as last_event
481
FROM events
482
WHERE timestamp >= '2023-01-01'
483
GROUP BY user_id, event_type
484
ORDER BY event_count DESC
485
""", engine)
486
487
print(f"Loaded {len(df)} rows into DataFrame")
488
print(df.head())
489
490
# Data analysis with pandas
491
top_users = df.groupby('user_id')['event_count'].sum().nlargest(10)
492
print(f"\nTop 10 users by event count:")
493
print(top_users)
494
495
# Write DataFrame back to ClickHouse via SQLAlchemy
496
processed_df = df.groupby('event_type').agg({
497
'event_count': 'sum',
498
'avg_value': 'mean',
499
'user_id': 'nunique'
500
}).reset_index()
501
502
processed_df.columns = ['event_type', 'total_events', 'mean_value', 'unique_users']
503
504
# Insert processed data
505
processed_df.to_sql(
506
'event_summary',
507
engine,
508
if_exists='replace',
509
index=False,
510
method='multi' # Batch insert
511
)
512
513
print("Processed data written to event_summary table")
514
```