0
# SQLAlchemy Integration
1
2
SQLAlchemy dialect support for Impala, enabling ORM and core SQLAlchemy functionality with Impala and Hive backends. This integration allows developers to use familiar SQLAlchemy patterns for database operations.
3
4
## Capabilities
5
6
### Dialect Classes
7
8
SQLAlchemy dialects that provide the interface between SQLAlchemy and Impala/Hive.
9
10
```python { .api }
11
class ImpalaDialect:
12
"""
13
SQLAlchemy dialect for Impala.
14
15
Provides SQLAlchemy integration for Impala databases, supporting
16
core SQLAlchemy functionality including table reflection, query
17
construction, and result handling.
18
19
Attributes:
20
name (str): Dialect name 'impala'
21
driver (str): Driver name 'impyla'
22
supports_alter (bool): Whether ALTER statements are supported
23
max_identifier_length (int): Maximum identifier length (128)
24
supports_sane_rowcount (bool): Whether rowcount is reliable
25
supports_sane_multi_rowcount (bool): Whether multi-statement rowcount works
26
supports_native_decimal (bool): Whether native decimal is supported
27
default_schema_name (str): Default schema name
28
supports_default_values (bool): Whether DEFAULT values are supported
29
supports_sequences (bool): Whether sequences are supported
30
sequences_optional (bool): Whether sequences are optional
31
preexecute_autoincrement_sequences (bool): Autoincrement sequence behavior
32
postfetch_lastrowid (bool): Whether to fetch last row ID after insert
33
"""
34
35
class Impala4Dialect(ImpalaDialect):
36
"""
37
SQLAlchemy dialect for Impala 4.x.
38
39
Specialized dialect for Impala 4.x versions with enhanced
40
features and optimizations specific to newer Impala releases.
41
Inherits from ImpalaDialect with additional features.
42
"""
43
```
44
45
### Custom SQL Types
46
47
Impala-specific SQL types for proper data type mapping in SQLAlchemy.
48
49
```python { .api }
50
class TINYINT:
51
"""
52
Impala TINYINT data type.
53
54
Represents Impala's TINYINT type (8-bit signed integer) in SQLAlchemy.
55
Maps to Python int with range validation.
56
"""
57
58
class INT:
59
"""
60
Impala INT data type.
61
62
Represents Impala's INT type (32-bit signed integer) in SQLAlchemy.
63
Maps to Python int.
64
"""
65
66
class DOUBLE:
67
"""
68
Impala DOUBLE data type.
69
70
Represents Impala's DOUBLE type (64-bit floating point) in SQLAlchemy.
71
Maps to Python float.
72
"""
73
74
class STRING:
75
"""
76
Impala STRING data type.
77
78
Represents Impala's STRING type (variable-length string) in SQLAlchemy.
79
Maps to Python str.
80
"""
81
```
82
83
### Dialect Support Classes
84
85
Supporting classes for the Impala SQLAlchemy dialect implementation.
86
87
```python { .api }
88
class ImpalaDDLCompiler:
89
"""
90
DDL compiler for Impala dialect.
91
92
Handles compilation of Data Definition Language (DDL) statements
93
specific to Impala's SQL syntax and capabilities.
94
"""
95
96
class ImpalaTypeCompiler:
97
"""
98
Type compiler for Impala dialect.
99
100
Handles compilation of SQLAlchemy types to Impala-specific
101
type representations in SQL statements.
102
"""
103
104
class Impala4TypeCompiler(ImpalaTypeCompiler):
105
"""
106
Type compiler for Impala 4.x dialect.
107
108
Enhanced type compiler with support for Impala 4.x specific
109
data types and type representations.
110
"""
111
112
class ImpalaIdentifierPreparer:
113
"""
114
Identifier preparer for Impala dialect.
115
116
Handles proper quoting and escaping of identifiers (table names,
117
column names, etc.) according to Impala's identifier rules.
118
"""
119
120
class ImpalaExecutionContext:
121
"""
122
Execution context for Impala dialect.
123
124
Manages the execution context for SQLAlchemy operations,
125
including parameter binding and result processing.
126
"""
127
```
128
129
## Usage Examples
130
131
### Basic SQLAlchemy Engine Setup
132
133
```python
134
from sqlalchemy import create_engine, text
135
from sqlalchemy.orm import sessionmaker
136
137
# Create engine using impyla dialect
138
engine = create_engine(
139
'impala://impala-host:21050/default',
140
echo=True # Enable SQL logging
141
)
142
143
# Test connection
144
with engine.connect() as conn:
145
result = conn.execute(text("SELECT version()"))
146
version = result.fetchone()
147
print(f"Connected to: {version[0]}")
148
```
149
150
### Authentication with SQLAlchemy
151
152
```python
153
from sqlalchemy import create_engine
154
155
# Kerberos authentication
156
engine = create_engine(
157
'impala://impala-host:21050/default',
158
connect_args={
159
'auth_mechanism': 'GSSAPI',
160
'kerberos_service_name': 'impala'
161
}
162
)
163
164
# LDAP authentication
165
engine = create_engine(
166
'impala://username:password@impala-host:21050/default',
167
connect_args={
168
'auth_mechanism': 'LDAP'
169
}
170
)
171
172
# SSL connection
173
engine = create_engine(
174
'impala://impala-host:21050/default',
175
connect_args={
176
'use_ssl': True,
177
'ca_cert': '/path/to/ca-cert.pem'
178
}
179
)
180
```
181
182
### Table Reflection and Metadata
183
184
```python
185
from sqlalchemy import create_engine, MetaData, Table
186
from sqlalchemy.orm import sessionmaker
187
188
engine = create_engine('impala://impala-host:21050/sales_db')
189
190
# Reflect existing tables
191
metadata = MetaData()
192
metadata.reflect(bind=engine)
193
194
# Access reflected tables
195
customers = metadata.tables['customers']
196
orders = metadata.tables['orders']
197
198
print("Customers table columns:")
199
for column in customers.columns:
200
print(f" {column.name}: {column.type}")
201
202
# Query using reflected table
203
with engine.connect() as conn:
204
# Select all customers
205
result = conn.execute(customers.select().limit(10))
206
for row in result:
207
print(row)
208
```
209
210
### ORM Usage with Impala
211
212
```python
213
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Float
214
from sqlalchemy.ext.declarative import declarative_base
215
from sqlalchemy.orm import sessionmaker
216
from datetime import datetime
217
218
Base = declarative_base()
219
220
class Customer(Base):
221
__tablename__ = 'customers'
222
223
customer_id = Column(Integer, primary_key=True)
224
name = Column(String)
225
email = Column(String)
226
created_at = Column(DateTime)
227
228
class Order(Base):
229
__tablename__ = 'orders'
230
231
order_id = Column(Integer, primary_key=True)
232
customer_id = Column(Integer)
233
order_date = Column(DateTime)
234
total_amount = Column(Float)
235
status = Column(String)
236
237
# Setup
238
engine = create_engine('impala://impala-host:21050/ecommerce')
239
Session = sessionmaker(bind=engine)
240
session = Session()
241
242
# Query using ORM
243
recent_customers = session.query(Customer).filter(
244
Customer.created_at >= datetime(2023, 1, 1)
245
).limit(10).all()
246
247
for customer in recent_customers:
248
print(f"Customer: {customer.name} ({customer.email})")
249
250
# Aggregate queries
251
from sqlalchemy import func
252
253
monthly_sales = session.query(
254
func.date_trunc('month', Order.order_date).label('month'),
255
func.sum(Order.total_amount).label('total_sales'),
256
func.count(Order.order_id).label('order_count')
257
).group_by(
258
func.date_trunc('month', Order.order_date)
259
).order_by('month').all()
260
261
for month, sales, count in monthly_sales:
262
print(f"{month}: ${sales:,.2f} ({count} orders)")
263
264
session.close()
265
```
266
267
### Core SQLAlchemy Queries
268
269
```python
270
from sqlalchemy import create_engine, text, select, func, and_, or_
271
from sqlalchemy import MetaData, Table
272
273
engine = create_engine('impala://impala-host:21050/analytics')
274
275
# Reflect tables
276
metadata = MetaData()
277
metadata.reflect(bind=engine)
278
sales = metadata.tables['sales']
279
products = metadata.tables['products']
280
281
with engine.connect() as conn:
282
# Complex query with joins
283
query = select([
284
products.c.category,
285
func.sum(sales.c.amount).label('total_sales'),
286
func.count(sales.c.sale_id).label('sale_count'),
287
func.avg(sales.c.amount).label('avg_sale')
288
]).select_from(
289
sales.join(products, sales.c.product_id == products.c.product_id)
290
).where(
291
and_(
292
sales.c.sale_date >= '2023-01-01',
293
products.c.category.in_(['Electronics', 'Clothing', 'Books'])
294
)
295
).group_by(
296
products.c.category
297
).order_by(
298
func.sum(sales.c.amount).desc()
299
)
300
301
result = conn.execute(query)
302
303
print("Sales by Category:")
304
for row in result:
305
print(f"{row.category}: ${row.total_sales:,.2f} "
306
f"({row.sale_count} sales, avg: ${row.avg_sale:.2f})")
307
```
308
309
### Custom Types Usage
310
311
```python
312
from sqlalchemy import create_engine, Column, Table, MetaData
313
from impala.sqlalchemy import TINYINT, INT, DOUBLE, STRING
314
315
engine = create_engine('impala://impala-host:21050/test_db')
316
metadata = MetaData()
317
318
# Define table with Impala-specific types
319
sensor_data = Table('sensor_readings', metadata,
320
Column('sensor_id', INT, primary_key=True),
321
Column('reading_type', TINYINT), # 0-255 range
322
Column('temperature', DOUBLE),
323
Column('location', STRING),
324
Column('notes', STRING)
325
)
326
327
# Create table (if it doesn't exist)
328
with engine.connect() as conn:
329
# Note: CREATE TABLE IF NOT EXISTS may need to be done manually
330
# as Impala has specific syntax requirements
331
332
# Insert data using the defined types
333
conn.execute(sensor_data.insert().values([
334
{'sensor_id': 1, 'reading_type': 1, 'temperature': 23.5,
335
'location': 'Building A', 'notes': 'Normal reading'},
336
{'sensor_id': 2, 'reading_type': 2, 'temperature': 25.1,
337
'location': 'Building B', 'notes': 'Slightly elevated'}
338
]))
339
340
# Query back the data
341
result = conn.execute(sensor_data.select())
342
for row in result:
343
print(f"Sensor {row.sensor_id}: {row.temperature}°C at {row.location}")
344
```
345
346
### Advanced Features and Optimizations
347
348
```python
349
from sqlalchemy import create_engine, text
350
from sqlalchemy.pool import StaticPool
351
352
# Connection pooling configuration
353
engine = create_engine(
354
'impala://impala-host:21050/warehouse',
355
poolclass=StaticPool,
356
pool_size=10,
357
max_overflow=20,
358
pool_pre_ping=True, # Validate connections
359
connect_args={
360
'timeout': 60,
361
'retries': 3
362
}
363
)
364
365
# Partition-aware queries
366
with engine.connect() as conn:
367
# Query with partition pruning
368
partitioned_query = text("""
369
SELECT
370
product_category,
371
SUM(sales_amount) as total_sales
372
FROM sales_partitioned
373
WHERE
374
year = :year
375
AND month = :month
376
GROUP BY product_category
377
""")
378
379
result = conn.execute(partitioned_query, year=2023, month=6)
380
381
for row in result:
382
print(f"{row.product_category}: ${row.total_sales:,.2f}")
383
384
# Bulk operations
385
def bulk_insert_with_sqlalchemy(engine, table_name, data_rows):
386
"""Efficient bulk insert using SQLAlchemy."""
387
388
metadata = MetaData()
389
metadata.reflect(bind=engine)
390
table = metadata.tables[table_name]
391
392
with engine.connect() as conn:
393
# Use bulk insert for better performance
394
conn.execute(table.insert(), data_rows)
395
print(f"Inserted {len(data_rows)} rows into {table_name}")
396
397
# Usage
398
sample_data = [
399
{'product_id': 1, 'name': 'Widget A', 'price': 19.99},
400
{'product_id': 2, 'name': 'Widget B', 'price': 24.99},
401
{'product_id': 3, 'name': 'Widget C', 'price': 29.99}
402
]
403
404
bulk_insert_with_sqlalchemy(engine, 'products', sample_data)
405
```
406
407
### Connection URI Examples
408
409
```python
410
# Basic connection
411
engine = create_engine('impala://hostname:21050/database')
412
413
# With authentication
414
engine = create_engine('impala://user:pass@hostname:21050/database')
415
416
# With SSL
417
engine = create_engine('impala://hostname:21050/database?use_ssl=true')
418
419
# HTTP transport
420
engine = create_engine(
421
'impala://hostname:28000/database',
422
connect_args={
423
'use_http_transport': True,
424
'http_path': 'cliservice'
425
}
426
)
427
428
# Multiple connection parameters
429
engine = create_engine(
430
'impala://hostname:21050/database',
431
connect_args={
432
'auth_mechanism': 'GSSAPI',
433
'kerberos_service_name': 'impala',
434
'use_ssl': True,
435
'timeout': 120
436
}
437
)
438
```