0
# SQLAlchemy Integration
1
2
Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications. Provides seamless database abstraction layer for Athena.
3
4
## Installation
5
6
```bash
7
pip install PyAthena[SQLAlchemy]
8
```
9
10
## Capabilities
11
12
### Athena Dialects
13
14
Multiple dialect implementations optimized for different use cases and result formats.
15
16
```python { .api }
17
class AthenaDialect:
18
"""Base Athena dialect for standard SQLAlchemy operations."""
19
name: str = "awsathena"
20
21
class AthenaRestDialect(AthenaDialect):
22
"""REST API-based dialect for standard operations."""
23
name: str = "awsathena+rest"
24
25
class AthenaPandasDialect(AthenaDialect):
26
"""Pandas-optimized dialect for DataFrame operations."""
27
name: str = "awsathena+pandas"
28
29
class AthenaArrowDialect(AthenaDialect):
30
"""Arrow-optimized dialect for columnar operations."""
31
name: str = "awsathena+arrow"
32
```
33
34
### Custom Athena Types
35
36
SQLAlchemy type implementations for Athena-specific data types.
37
38
```python { .api }
39
class TINYINT(sqltypes.Integer):
40
"""Athena TINYINT type (8-bit integer)."""
41
42
class STRUCT(TypeEngine[Dict]):
43
"""Athena STRUCT type for nested objects."""
44
45
class MAP(TypeEngine[Dict]):
46
"""Athena MAP type for key-value pairs."""
47
48
class ARRAY(TypeEngine[List]):
49
"""Athena ARRAY type for ordered collections."""
50
51
class AthenaTimestamp(TypeEngine[datetime]):
52
"""Athena TIMESTAMP type with timezone support."""
53
54
class AthenaDate(TypeEngine[date]):
55
"""Athena DATE type."""
56
```
57
58
### SQL Compilation
59
60
Custom compilers for translating SQLAlchemy constructs to Athena SQL.
61
62
```python { .api }
63
class AthenaStatementCompiler(SQLCompiler):
64
"""Compiles SQLAlchemy statements to Athena SQL."""
65
66
class AthenaDDLCompiler(DDLCompiler):
67
"""Compiles DDL statements for Athena."""
68
69
class AthenaTypeCompiler(GenericTypeCompiler):
70
"""Compiles SQLAlchemy types to Athena SQL types."""
71
```
72
73
### Identifier Preparation
74
75
Classes for properly formatting identifiers in different SQL contexts.
76
77
```python { .api }
78
class AthenaDMLIdentifierPreparer(IdentifierPreparer):
79
"""Prepares identifiers for DML statements."""
80
81
class AthenaDDLIdentifierPreparer(IdentifierPreparer):
82
"""Prepares identifiers for DDL statements."""
83
```
84
85
## Usage Examples
86
87
### Basic SQLAlchemy Connection
88
89
```python
90
from sqlalchemy import create_engine, text
91
from sqlalchemy.orm import sessionmaker
92
93
# Create engine with Athena dialect
94
engine = create_engine(
95
"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
96
"s3_staging_dir=s3://my-bucket/athena-results/"
97
)
98
99
# Test connection
100
with engine.connect() as conn:
101
result = conn.execute(text("SELECT 1 as test_column"))
102
print(result.fetchone())
103
```
104
105
### Advanced Connection Configuration
106
107
```python
108
from sqlalchemy import create_engine
109
from urllib.parse import quote_plus
110
111
# Connection string with all parameters
112
connection_params = {
113
"aws_access_key_id": "YOUR_ACCESS_KEY",
114
"aws_secret_access_key": quote_plus("YOUR_SECRET_KEY"),
115
"region_name": "us-west-2",
116
"schema_name": "default",
117
"s3_staging_dir": quote_plus("s3://my-bucket/athena-results/"),
118
"work_group": "primary",
119
"catalog_name": "AwsDataCatalog"
120
}
121
122
# Build connection string
123
connection_string = (
124
f"awsathena+rest://{connection_params['aws_access_key_id']}:"
125
f"{connection_params['aws_secret_access_key']}@"
126
f"athena.{connection_params['region_name']}.amazonaws.com:443/"
127
f"{connection_params['schema_name']}?"
128
f"s3_staging_dir={connection_params['s3_staging_dir']}&"
129
f"work_group={connection_params['work_group']}&"
130
f"catalog_name={connection_params['catalog_name']}"
131
)
132
133
engine = create_engine(connection_string)
134
```
135
136
### ORM Model Definition
137
138
```python
139
from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean
140
from sqlalchemy.ext.declarative import declarative_base
141
from sqlalchemy.orm import sessionmaker
142
from pyathena.sqlalchemy.types import TINYINT, STRUCT, ARRAY, MAP
143
144
Base = declarative_base()
145
146
class Customer(Base):
147
__tablename__ = 'customers'
148
149
customer_id = Column(Integer, primary_key=True)
150
name = Column(String(100), nullable=False)
151
email = Column(String(255), unique=True)
152
age = Column(TINYINT) # Athena-specific type
153
is_active = Column(Boolean, default=True)
154
created_at = Column(DateTime)
155
total_spent = Column(Numeric(10, 2))
156
157
# Complex types
158
preferences = Column(MAP(String, String)) # Key-value preferences
159
order_history = Column(ARRAY(Integer)) # Array of order IDs
160
profile = Column(STRUCT([ # Nested structure
161
('address', String),
162
('phone', String),
163
('preferences', MAP(String, String))
164
]))
165
166
class Order(Base):
167
__tablename__ = 'orders'
168
169
order_id = Column(Integer, primary_key=True)
170
customer_id = Column(Integer, nullable=False)
171
order_date = Column(DateTime)
172
amount = Column(Numeric(10, 2))
173
status = Column(String(20))
174
items = Column(ARRAY(STRUCT([ # Array of structured items
175
('product_id', Integer),
176
('quantity', Integer),
177
('price', Numeric(8, 2))
178
])))
179
180
# Create engine and session
181
engine = create_engine("awsathena+rest://...")
182
Session = sessionmaker(bind=engine)
183
session = Session()
184
```
185
186
### ORM Queries
187
188
```python
189
from sqlalchemy import func, and_, or_
190
from datetime import datetime, timedelta
191
192
# Basic queries
193
active_customers = session.query(Customer).filter(Customer.is_active == True).all()
194
195
# Complex filtering
196
high_value_customers = session.query(Customer).filter(
197
and_(
198
Customer.total_spent > 1000,
199
Customer.is_active == True,
200
Customer.created_at > datetime.now() - timedelta(days=365)
201
)
202
).all()
203
204
# Aggregation queries
205
customer_stats = session.query(
206
func.count(Customer.customer_id).label('total_customers'),
207
func.avg(Customer.total_spent).label('avg_spent'),
208
func.max(Customer.total_spent).label('max_spent'),
209
func.min(Customer.age).label('min_age'),
210
func.max(Customer.age).label('max_age')
211
).first()
212
213
print(f"Total customers: {customer_stats.total_customers}")
214
print(f"Average spent: ${customer_stats.avg_spent:.2f}")
215
216
# Join queries
217
recent_orders = session.query(Customer, Order).join(
218
Order, Customer.customer_id == Order.customer_id
219
).filter(
220
Order.order_date > datetime.now() - timedelta(days=30)
221
).all()
222
223
# Group by queries
224
monthly_revenue = session.query(
225
func.date_format(Order.order_date, '%Y-%m').label('month'),
226
func.sum(Order.amount).label('total_revenue'),
227
func.count(Order.order_id).label('order_count')
228
).group_by(
229
func.date_format(Order.order_date, '%Y-%m')
230
).order_by('month').all()
231
232
for row in monthly_revenue:
233
print(f"Month: {row.month}, Revenue: ${row.total_revenue:.2f}, Orders: {row.order_count}")
234
```
235
236
### Working with Complex Types
237
238
```python
239
from sqlalchemy import text
240
241
# Query with complex type operations
242
complex_query = text("""
243
SELECT
244
customer_id,
245
name,
246
preferences['newsletter'] as newsletter_pref,
247
cardinality(order_history) as total_orders,
248
profile.address as customer_address
249
FROM customers
250
WHERE preferences['vip'] = 'true'
251
AND cardinality(order_history) > 5
252
""")
253
254
results = session.execute(complex_query).fetchall()
255
for row in results:
256
print(f"Customer: {row.name}, Address: {row.customer_address}, Orders: {row.total_orders}")
257
258
# Insert with complex types
259
new_customer = Customer(
260
customer_id=12345,
261
name="John Doe",
262
email="john@example.com",
263
age=35,
264
preferences={
265
'newsletter': 'true',
266
'vip': 'false',
267
'language': 'en'
268
},
269
order_history=[1001, 1002, 1003],
270
profile={
271
'address': '123 Main St, Anytown, USA',
272
'phone': '+1-555-0123',
273
'preferences': {
274
'contact_method': 'email',
275
'timezone': 'EST'
276
}
277
}
278
)
279
280
session.add(new_customer)
281
session.commit()
282
```
283
284
### Pandas Integration with SQLAlchemy
285
286
```python
287
from sqlalchemy import create_engine
288
import pandas as pd
289
290
# Use pandas dialect for DataFrame operations
291
engine = create_engine("awsathena+pandas://...")
292
293
# Read query results directly into DataFrame
294
df = pd.read_sql_query("""
295
SELECT
296
customer_id,
297
name,
298
total_spent,
299
age,
300
is_active
301
FROM customers
302
WHERE total_spent > 500
303
""", engine)
304
305
print(df.head())
306
print(f"DataFrame shape: {df.shape}")
307
308
# Use DataFrame for analysis
309
customer_analysis = df.groupby('is_active').agg({
310
'total_spent': ['mean', 'sum', 'count'],
311
'age': ['mean', 'min', 'max']
312
}).round(2)
313
314
print("Customer Analysis by Status:")
315
print(customer_analysis)
316
317
# Write DataFrame back to Athena (via S3)
318
# Note: This requires additional S3 write permissions
319
df_to_write = df[df['total_spent'] > 1000]
320
df_to_write.to_sql(
321
'high_value_customers',
322
engine,
323
if_exists='replace',
324
index=False,
325
method='multi' # Batch insert for better performance
326
)
327
```
328
329
### Arrow Integration with SQLAlchemy
330
331
```python
332
from sqlalchemy import create_engine
333
import pyarrow as pa
334
335
# Use Arrow dialect for columnar operations
336
engine = create_engine("awsathena+arrow://...")
337
338
# Execute query and get Arrow Table
339
with engine.connect() as conn:
340
result = conn.execute(text("""
341
SELECT
342
product_category,
343
COUNT(*) as order_count,
344
SUM(amount) as total_revenue,
345
AVG(amount) as avg_order_value
346
FROM orders
347
GROUP BY product_category
348
ORDER BY total_revenue DESC
349
"""))
350
351
# Convert to Arrow Table (if using arrow dialect)
352
arrow_table = pa.Table.from_pandas(result.fetchall())
353
354
# High-performance columnar operations
355
total_revenue = pa.compute.sum(arrow_table.column('total_revenue'))
356
print(f"Total revenue across all categories: ${total_revenue.as_py():,.2f}")
357
```
358
359
### DDL Operations
360
361
```python
362
from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime
363
from sqlalchemy.schema import CreateTable
364
365
engine = create_engine("awsathena+rest://...")
366
metadata = MetaData()
367
368
# Define table structure
369
analytics_table = Table(
370
'user_analytics',
371
metadata,
372
Column('user_id', Integer, primary_key=True),
373
Column('session_date', DateTime),
374
Column('page_views', Integer),
375
Column('session_duration', Integer),
376
Column('user_agent', String(500)),
377
Column('referrer', String(200))
378
)
379
380
# Generate CREATE TABLE statement
381
create_stmt = CreateTable(analytics_table)
382
print(str(create_stmt.compile(engine)))
383
384
# Create table in Athena
385
with engine.connect() as conn:
386
metadata.create_all(conn)
387
print("Table created successfully")
388
389
# Create external table pointing to S3 data
390
external_table_ddl = text("""
391
CREATE EXTERNAL TABLE IF NOT EXISTS web_logs (
392
timestamp string,
393
ip_address string,
394
user_agent string,
395
request_url string,
396
response_code int,
397
bytes_sent bigint
398
)
399
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
400
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
401
LOCATION 's3://my-bucket/web-logs/'
402
TBLPROPERTIES ('has_encrypted_data'='false')
403
""")
404
405
with engine.connect() as conn:
406
conn.execute(external_table_ddl)
407
print("External table created")
408
```
409
410
### Advanced Query Patterns
411
412
```python
413
from sqlalchemy import create_engine, text, bindparam
414
from sqlalchemy.sql import and_, or_, func
415
416
engine = create_engine("awsathena+rest://...")
417
418
# Parameterized queries with SQLAlchemy
419
parameterized_query = text("""
420
SELECT
421
customer_id,
422
name,
423
total_spent,
424
CASE
425
WHEN total_spent > :high_threshold THEN 'High Value'
426
WHEN total_spent > :medium_threshold THEN 'Medium Value'
427
ELSE 'Low Value'
428
END as customer_segment
429
FROM customers
430
WHERE created_at >= :start_date
431
AND is_active = :active_status
432
ORDER BY total_spent DESC
433
LIMIT :limit_count
434
""").bindparam(
435
bindparam('high_threshold', Integer),
436
bindparam('medium_threshold', Integer),
437
bindparam('start_date', DateTime),
438
bindparam('active_status', Boolean),
439
bindparam('limit_count', Integer)
440
)
441
442
# Execute with parameters
443
with engine.connect() as conn:
444
result = conn.execute(parameterized_query, {
445
'high_threshold': 1000,
446
'medium_threshold': 500,
447
'start_date': datetime(2023, 1, 1),
448
'active_status': True,
449
'limit_count': 100
450
})
451
452
customers = result.fetchall()
453
for customer in customers:
454
print(f"{customer.name}: ${customer.total_spent:.2f} ({customer.customer_segment})")
455
456
# Window functions
457
window_query = text("""
458
SELECT
459
customer_id,
460
order_date,
461
amount,
462
ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence,
463
SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date
464
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,
465
LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_amount
466
FROM orders
467
WHERE order_date >= DATE '2023-01-01'
468
ORDER BY customer_id, order_date
469
""")
470
471
with engine.connect() as conn:
472
result = conn.execute(window_query)
473
for row in result:
474
print(f"Customer {row.customer_id}: Order {row.order_sequence}, "
475
f"Amount: ${row.amount:.2f}, Running Total: ${row.running_total:.2f}")
476
```
477
478
### Connection Pooling and Performance
479
480
```python
481
from sqlalchemy import create_engine, pool
482
from sqlalchemy.pool import QueuePool
483
484
# Configure connection pooling
485
engine = create_engine(
486
"awsathena+rest://...",
487
poolclass=QueuePool,
488
pool_size=5, # Number of connections to maintain
489
max_overflow=10, # Additional connections allowed
490
pool_pre_ping=True, # Verify connections before use
491
pool_recycle=3600, # Recycle connections after 1 hour
492
connect_args={
493
'poll_interval': 1,
494
'kill_on_interrupt': True
495
}
496
)
497
498
# Monitor connection pool
499
def check_pool_status():
500
pool = engine.pool
501
print(f"Pool size: {pool.size()}")
502
print(f"Checked out connections: {pool.checkedout()}")
503
print(f"Overflow connections: {pool.overflow()}")
504
505
# Use context managers for proper connection handling
506
with engine.connect() as conn:
507
result = conn.execute(text("SELECT COUNT(*) FROM large_table"))
508
count = result.scalar()
509
print(f"Table has {count} rows")
510
511
check_pool_status()
512
```
513
514
### Transaction Handling
515
516
```python
517
from sqlalchemy import create_engine, text
518
from sqlalchemy.exc import SQLAlchemyError
519
520
engine = create_engine("awsathena+rest://...")
521
522
# Note: Athena doesn't support traditional transactions
523
# But SQLAlchemy can handle connection-level operations
524
525
def safe_bulk_operation():
526
with engine.connect() as conn:
527
try:
528
# Multiple related operations
529
conn.execute(text("CREATE TABLE temp_results AS SELECT * FROM source_table WHERE condition = 1"))
530
531
conn.execute(text("""
532
INSERT INTO final_table
533
SELECT customer_id, SUM(amount) as total
534
FROM temp_results
535
GROUP BY customer_id
536
"""))
537
538
conn.execute(text("DROP TABLE temp_results"))
539
540
print("Bulk operation completed successfully")
541
542
except SQLAlchemyError as e:
543
print(f"Operation failed: {e}")
544
# Cleanup if needed
545
try:
546
conn.execute(text("DROP TABLE IF EXISTS temp_results"))
547
except:
548
pass
549
raise
550
551
safe_bulk_operation()
552
```
553
554
## Connection String Formats
555
556
### Basic Connection String
557
```
558
awsathena+rest://aws_access_key_id:aws_secret_access_key@athena.region.amazonaws.com:443/schema_name?s3_staging_dir=s3://bucket/path/
559
```
560
561
### With All Parameters
562
```
563
awsathena+rest://access_key:secret_key@athena.us-west-2.amazonaws.com:443/default?s3_staging_dir=s3://my-bucket/results/&work_group=primary&catalog_name=AwsDataCatalog®ion_name=us-west-2
564
```
565
566
### Using Environment Variables
567
```python
568
import os
569
from sqlalchemy import create_engine
570
571
# Set environment variables
572
os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'
573
os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'
574
os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'
575
576
# Simplified connection string
577
engine = create_engine(
578
"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"
579
"s3_staging_dir=s3://my-bucket/athena-results/"
580
)
581
```
582
583
## Dialect-Specific Features
584
585
- **Base Dialect**: Standard SQLAlchemy operations with tuple results
586
- **REST Dialect**: Optimized for general-purpose queries
587
- **Pandas Dialect**: Automatic DataFrame conversion for analytical queries
588
- **Arrow Dialect**: Columnar processing for high-performance analytics
589
590
Each dialect supports the same SQL operations but optimizes result processing for different use cases.