0
# SQLAlchemy Integration
1
2
Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database. The integration provides table introspection, query compilation, and type mapping for seamless integration with SQLAlchemy applications.
3
4
## Capabilities
5
6
### Dialect Registration
7
8
PyDruid automatically registers SQLAlchemy dialects through entry points.
9
10
```python { .api }
11
# Available dialect URLs
12
druid://host:port/path/to/sql/endpoint
13
druid+http://host:port/path/to/sql/endpoint
14
druid+https://host:port/path/to/sql/endpoint
15
```
16
17
### Engine Creation
18
19
Create SQLAlchemy engines for Druid connectivity.
20
21
```python { .api }
22
from sqlalchemy import create_engine
23
24
# Basic HTTP connection
25
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
26
27
# Explicit HTTP (equivalent to above)
28
engine = create_engine('druid+http://localhost:8082/druid/v2/sql/')
29
30
# HTTPS connection
31
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')
32
33
# With authentication
34
engine = create_engine('druid://username:password@localhost:8082/druid/v2/sql/')
35
36
# With query parameters for configuration
37
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000')
38
```
39
40
### Dialect Classes
41
42
Core dialect implementation classes.
43
44
```python { .api }
45
class DruidDialect:
46
"""Main SQLAlchemy dialect for Druid."""
47
48
name: str = "druid"
49
scheme: str = "http"
50
51
def dbapi(self) -> type:
52
"""Return the DB API module for this dialect."""
53
54
def create_connect_args(self, url) -> tuple:
55
"""
56
Build connection arguments from SQLAlchemy URL.
57
58
Parameters:
59
- url: SQLAlchemy URL object
60
61
Returns:
62
Tuple of (args, kwargs) for connect() function
63
"""
64
65
def do_ping(self, dbapi_connection) -> bool:
66
"""
67
Test if connection is alive.
68
69
Parameters:
70
- dbapi_connection: DB API connection object
71
72
Returns:
73
True if connection is alive, False otherwise
74
"""
75
76
def get_schema_names(self, connection, **kwargs) -> list:
77
"""Get list of schema names."""
78
79
def get_table_names(self, connection, schema: str = None, **kwargs) -> list:
80
"""
81
Get list of table names in schema.
82
83
Parameters:
84
- connection: SQLAlchemy connection
85
- schema: Schema name (optional)
86
87
Returns:
88
List of table names
89
"""
90
91
def get_columns(self, connection, table_name: str, schema: str = None, **kwargs) -> list:
92
"""
93
Get column information for a table.
94
95
Parameters:
96
- connection: SQLAlchemy connection
97
- table_name: Name of table to introspect
98
- schema: Schema name (optional)
99
100
Returns:
101
List of column dictionaries with metadata
102
"""
103
104
class DruidHTTPDialect(DruidDialect):
105
"""HTTP-specific Druid dialect (alias for DruidDialect)."""
106
scheme: str = "http"
107
108
class DruidHTTPSDialect(DruidDialect):
109
"""HTTPS-specific Druid dialect."""
110
scheme: str = "https"
111
```
112
113
### SQL Compilation
114
115
Custom SQL compilation for Druid-specific features.
116
117
```python { .api }
118
class DruidCompiler(compiler.SQLCompiler):
119
"""SQL compiler for Druid dialect (inherits from SQLAlchemy SQLCompiler)."""
120
pass
121
122
class DruidTypeCompiler(compiler.GenericTypeCompiler):
123
"""Type compiler for mapping SQLAlchemy types to Druid types."""
124
125
def visit_REAL(self, type_, **kwargs) -> str:
126
"""Map REAL type to Druid DOUBLE."""
127
return "DOUBLE"
128
129
def visit_NUMERIC(self, type_, **kwargs) -> str:
130
"""Map NUMERIC type to Druid LONG."""
131
return "LONG"
132
133
def visit_CHAR(self, type_, **kwargs) -> str:
134
"""Map CHAR type to Druid STRING."""
135
return "STRING"
136
137
def visit_DATETIME(self, type_, **kwargs) -> str:
138
"""Map DATETIME type to Druid LONG."""
139
return "LONG"
140
141
def visit_BLOB(self, type_, **kwargs) -> str:
142
"""Map BLOB type to Druid COMPLEX."""
143
return "COMPLEX"
144
145
class DruidIdentifierPreparer(compiler.IdentifierPreparer):
146
"""Identifier preparation for Druid SQL with universal reserved words."""
147
reserved_words = UniversalSet()
148
149
class UniversalSet:
150
"""Set that contains all items (no reserved words are enforced)."""
151
152
def __contains__(self, item) -> bool:
153
"""Always returns True - no reserved words."""
154
return True
155
```
156
157
### Type Mapping
158
159
JDBC type mapping for column introspection.
160
161
```python { .api }
162
jdbc_type_map: dict = {
163
-6: types.BigInteger, # TINYINT
164
-5: types.BigInteger, # BIGINT
165
1: types.String, # CHAR
166
3: types.Float, # DECIMAL
167
4: types.BigInteger, # INTEGER
168
5: types.BigInteger, # SMALLINT
169
6: types.Float, # FLOAT
170
7: types.Float, # REAL
171
8: types.Float, # DOUBLE
172
12: types.String, # VARCHAR
173
16: types.Boolean, # BOOLEAN
174
91: types.DATE, # DATE
175
93: types.TIMESTAMP, # TIMESTAMP
176
1111: types.BLOB, # OTHER
177
}
178
```
179
180
## Usage Examples
181
182
### Basic Engine Usage
183
184
```python
185
from sqlalchemy import create_engine, text
186
187
# Create engine
188
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
189
190
# Execute raw SQL
191
with engine.connect() as conn:
192
result = conn.execute(text("SELECT COUNT(*) FROM places"))
193
count = result.scalar()
194
print(f"Total places: {count}")
195
```
196
197
### Table Reflection
198
199
```python
200
from sqlalchemy import create_engine, MetaData, Table, select
201
202
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
203
metadata = MetaData()
204
205
# Reflect existing table structure
206
places = Table('places', metadata, autoload_with=engine)
207
208
# Inspect columns
209
for column in places.columns:
210
print(f"Column: {column.name}, Type: {column.type}")
211
212
# Execute query using reflected table
213
with engine.connect() as conn:
214
stmt = select(places.c.place_name).limit(10)
215
result = conn.execute(stmt)
216
for row in result:
217
print(row[0])
218
```
219
220
### Core Usage with Query Builder
221
222
```python
223
from sqlalchemy import create_engine, MetaData, Table, select, func, and_
224
225
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
226
metadata = MetaData()
227
228
# Reflect tables
229
twitterstream = Table('twitterstream', metadata, autoload_with=engine)
230
231
# Build complex query
232
stmt = select(
233
twitterstream.c.user_name,
234
func.count().label('tweet_count')
235
).where(
236
and_(
237
twitterstream.c.user_lang == 'en',
238
twitterstream.c.__time >= '2014-03-01'
239
)
240
).group_by(
241
twitterstream.c.user_name
242
).order_by(
243
func.count().desc()
244
).limit(10)
245
246
# Execute query
247
with engine.connect() as conn:
248
result = conn.execute(stmt)
249
for row in result:
250
print(f"User: {row.user_name}, Tweets: {row.tweet_count}")
251
```
252
253
### Connection Configuration
254
255
```python
256
from sqlalchemy import create_engine
257
258
# With column headers (recommended for Druid >= 0.13.0)
259
engine = create_engine('druid://localhost:8082/druid/v2/sql/?header=true')
260
261
# With authentication
262
engine = create_engine('druid://user:pass@localhost:8082/druid/v2/sql/')
263
264
# HTTPS with SSL verification
265
engine = create_engine('druid+https://localhost:8082/druid/v2/sql/')
266
267
# With connection pool settings
268
engine = create_engine(
269
'druid://localhost:8082/druid/v2/sql/',
270
pool_size=10,
271
max_overflow=20,
272
pool_timeout=30
273
)
274
```
275
276
### Advanced Usage with ORM
277
278
```python
279
from sqlalchemy import create_engine, Column, String, Integer, DateTime
280
from sqlalchemy.ext.declarative import declarative_base
281
from sqlalchemy.orm import sessionmaker
282
283
Base = declarative_base()
284
285
class TwitterStream(Base):
286
__tablename__ = 'twitterstream'
287
288
# Define primary key (required for ORM, though Druid doesn't have real PKs)
289
__time = Column(DateTime, primary_key=True)
290
user_name = Column(String)
291
tweet_text = Column(String)
292
user_lang = Column(String)
293
294
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
295
Session = sessionmaker(bind=engine)
296
297
# Note: ORM usage with Druid is limited due to Druid's nature as an analytical database
298
# Core usage is generally recommended for most use cases
299
```
300
301
### Error Handling
302
303
```python
304
from sqlalchemy import create_engine, text
305
from sqlalchemy.exc import SQLAlchemyError, DatabaseError
306
307
engine = create_engine('druid://localhost:8082/druid/v2/sql/')
308
309
try:
310
with engine.connect() as conn:
311
result = conn.execute(text("SELECT * FROM nonexistent_table"))
312
rows = result.fetchall()
313
except DatabaseError as e:
314
print(f"Database error: {e}")
315
except SQLAlchemyError as e:
316
print(f"SQLAlchemy error: {e}")
317
```
318
319
## Configuration Options
320
321
### URL Parameters
322
323
- **header**: Set to 'true' to request column headers (recommended for Druid >= 0.13.0)
324
- **timeout**: Query timeout in milliseconds
325
- **context**: JSON-encoded query context parameters
326
327
Example:
328
```python
329
engine = create_engine(
330
'druid://localhost:8082/druid/v2/sql/?header=true&timeout=60000&context={"timeout":60000}'
331
)
332
```
333
334
### Engine Options
335
336
Standard SQLAlchemy engine options apply:
337
- **pool_size**: Size of connection pool
338
- **max_overflow**: Maximum overflow connections
339
- **pool_timeout**: Connection timeout
340
- **echo**: Enable SQL logging
341
342
## Limitations
343
344
Due to Druid's nature as an analytical database:
345
346
- **No transactions**: COMMIT/ROLLBACK operations are no-ops
347
- **Read-only**: INSERT/UPDATE/DELETE operations are not supported
348
- **Limited ORM support**: Core usage is recommended over ORM
349
- **No foreign keys**: Druid doesn't support relational constraints
350
- **Time-based partitioning**: Tables are typically partitioned by time
351
352
The SQLAlchemy integration is optimized for analytical queries and data exploration rather than transactional operations.