0
# SQLAlchemy Integration
1
2
High-level database operations using SQLAlchemy's expression language and ORM capabilities with async/await support. The `aiomysql.sa` subpackage provides SQLAlchemy-compatible interfaces for advanced database operations.
3
4
## Core Imports
5
6
```python
7
import aiomysql.sa
8
from aiomysql.sa import create_engine, SAConnection, Engine
9
```
10
11
## Capabilities
12
13
### Engine Creation
14
15
Create a SQLAlchemy-compatible async engine with connection pooling.
16
17
```python { .api }
18
async def create_engine(
19
minsize: int = 1,
20
maxsize: int = 10,
21
loop = None,
22
dialect = None,
23
pool_recycle: int = -1,
24
compiled_cache = None,
25
**kwargs
26
) -> Engine:
27
"""
28
Create SQLAlchemy-compatible async engine.
29
30
Parameters:
31
- minsize: Minimum number of connections in pool
32
- maxsize: Maximum number of connections in pool
33
- loop: Event loop to use
34
- dialect: SQLAlchemy dialect instance
35
- pool_recycle: Seconds after which to recreate connections
36
- compiled_cache: Compiled query cache
37
- **kwargs: Connection parameters (same as connect())
38
39
Returns:
40
Engine context manager
41
"""
42
```
43
44
### Engine Management
45
46
The Engine class manages connection pools and provides SQLAlchemy-style database access.
47
48
```python { .api }
49
class Engine:
50
@property
51
def dialect(self):
52
"""SQLAlchemy dialect instance."""
53
54
@property
55
def name(self) -> str:
56
"""Engine name."""
57
58
@property
59
def driver(self) -> str:
60
"""Database driver name."""
61
62
@property
63
def minsize(self) -> int:
64
"""Minimum pool size."""
65
66
@property
67
def maxsize(self) -> int:
68
"""Maximum pool size."""
69
70
@property
71
def size(self) -> int:
72
"""Current pool size."""
73
74
@property
75
def freesize(self) -> int:
76
"""Number of free connections."""
77
78
def acquire(self) -> SAConnection:
79
"""
80
Acquire a connection from the engine pool.
81
82
Returns:
83
SAConnection context manager
84
"""
85
86
def release(self, conn: SAConnection) -> None:
87
"""
88
Return a connection to the pool.
89
90
Parameters:
91
- conn: Connection to release
92
"""
93
94
def close(self) -> None:
95
"""Close the engine and mark connections for closure."""
96
97
def terminate(self) -> None:
98
"""Terminate engine immediately, closing all connections."""
99
100
async def wait_closed(self) -> None:
101
"""Wait for engine to be completely closed."""
102
```
103
104
### SQLAlchemy Connection
105
106
SQLAlchemy-style connection wrapper providing high-level database operations.
107
108
```python { .api }
109
class SAConnection:
110
@property
111
def closed(self) -> bool:
112
"""Whether the connection is closed."""
113
114
@property
115
def connection(self) -> Connection:
116
"""Underlying aiomysql connection."""
117
118
@property
119
def in_transaction(self) -> bool:
120
"""Whether connection is in a transaction."""
121
122
def execute(self, query, *multiparams, **params) -> ResultProxy:
123
"""
124
Execute a SQLAlchemy query.
125
126
Parameters:
127
- query: SQLAlchemy selectable or text query
128
- multiparams: Multiple parameter sets for executemany-style
129
- params: Single parameter set
130
131
Returns:
132
ResultProxy for accessing results
133
"""
134
135
async def scalar(self, query, *multiparams, **params):
136
"""
137
Execute query and return scalar result.
138
139
Parameters:
140
- query: SQLAlchemy selectable or text query
141
- multiparams: Multiple parameter sets
142
- params: Single parameter set
143
144
Returns:
145
Single scalar value
146
"""
147
148
def begin(self) -> RootTransaction:
149
"""
150
Begin a transaction.
151
152
Returns:
153
Transaction context manager
154
"""
155
156
async def begin_nested(self) -> NestedTransaction:
157
"""
158
Begin a nested transaction (savepoint).
159
160
Returns:
161
Nested transaction instance
162
"""
163
164
async def begin_twophase(self, xid = None) -> TwoPhaseTransaction:
165
"""
166
Begin a two-phase transaction.
167
168
Parameters:
169
- xid: Transaction ID (generated if None)
170
171
Returns:
172
Two-phase transaction instance
173
"""
174
175
async def recover_twophase(self) -> list:
176
"""
177
Get list of prepared transaction IDs.
178
179
Returns:
180
List of transaction IDs ready for commit/rollback
181
"""
182
183
async def rollback_prepared(self, xid, *, is_prepared: bool = True) -> None:
184
"""
185
Rollback a prepared two-phase transaction.
186
187
Parameters:
188
- xid: Transaction ID to rollback
189
- is_prepared: Whether transaction is already prepared
190
"""
191
192
async def commit_prepared(self, xid, *, is_prepared: bool = True) -> None:
193
"""
194
Commit a prepared two-phase transaction.
195
196
Parameters:
197
- xid: Transaction ID to commit
198
- is_prepared: Whether transaction is already prepared
199
"""
200
201
async def close(self) -> None:
202
"""Close the connection."""
203
```
204
205
### Transaction Classes
206
207
Transaction management classes for various transaction types.
208
209
```python { .api }
210
class Transaction:
211
@property
212
def is_active(self) -> bool:
213
"""Whether transaction is active."""
214
215
@property
216
def connection(self) -> SAConnection:
217
"""Connection associated with transaction."""
218
219
async def commit(self) -> None:
220
"""Commit the transaction."""
221
222
async def rollback(self) -> None:
223
"""Rollback the transaction."""
224
225
async def close(self) -> None:
226
"""Close the transaction."""
227
228
class RootTransaction(Transaction):
229
"""Root-level database transaction."""
230
231
class NestedTransaction(Transaction):
232
"""Nested transaction using savepoints."""
233
234
class TwoPhaseTransaction(Transaction):
235
"""Two-phase (XA) transaction."""
236
237
@property
238
def xid(self) -> str:
239
"""Transaction ID."""
240
241
async def prepare(self) -> None:
242
"""Prepare transaction for commit."""
243
```
244
245
### Result Classes
246
247
Classes for handling query results in SQLAlchemy style.
248
249
```python { .api }
250
class ResultProxy:
251
@property
252
def dialect(self):
253
"""SQLAlchemy dialect."""
254
255
@property
256
def cursor(self) -> Cursor:
257
"""Underlying cursor."""
258
259
@property
260
def rowcount(self) -> int:
261
"""Number of affected rows."""
262
263
@property
264
def lastrowid(self) -> int:
265
"""Last inserted row ID."""
266
267
@property
268
def returns_rows(self) -> bool:
269
"""Whether query returns rows."""
270
271
@property
272
def closed(self) -> bool:
273
"""Whether result is closed."""
274
275
async def fetchone(self) -> RowProxy:
276
"""
277
Fetch next row.
278
279
Returns:
280
RowProxy instance or None
281
"""
282
283
async def fetchall(self) -> list:
284
"""
285
Fetch all remaining rows.
286
287
Returns:
288
List of RowProxy instances
289
"""
290
291
async def fetchmany(self, size: int = None) -> list:
292
"""
293
Fetch multiple rows.
294
295
Parameters:
296
- size: Number of rows to fetch
297
298
Returns:
299
List of RowProxy instances
300
"""
301
302
async def first(self) -> RowProxy:
303
"""
304
Fetch first row and close result.
305
306
Returns:
307
First RowProxy or None
308
"""
309
310
async def scalar(self):
311
"""
312
Fetch scalar value from first row.
313
314
Returns:
315
Single value from first column of first row
316
"""
317
318
def keys(self) -> list:
319
"""
320
Get column names.
321
322
Returns:
323
List of column names
324
"""
325
326
async def close(self) -> None:
327
"""Close the result."""
328
329
class RowProxy:
330
"""Single result row with dict-like and sequence-like access."""
331
332
def as_tuple(self) -> tuple:
333
"""
334
Convert row to tuple.
335
336
Returns:
337
Row data as tuple
338
"""
339
340
def __getitem__(self, key):
341
"""
342
Access column by name or index.
343
344
Parameters:
345
- key: Column name (str) or index (int)
346
347
Returns:
348
Column value
349
"""
350
351
def __getattr__(self, name):
352
"""
353
Access column as attribute.
354
355
Parameters:
356
- name: Column name
357
358
Returns:
359
Column value
360
"""
361
362
def __contains__(self, key) -> bool:
363
"""
364
Check if column exists.
365
366
Parameters:
367
- key: Column name
368
369
Returns:
370
True if column exists
371
"""
372
```
373
374
### Exception Classes
375
376
SQLAlchemy integration specific exceptions.
377
378
```python { .api }
379
class Error(Exception):
380
"""Base SQLAlchemy integration error."""
381
382
class ArgumentError(Error):
383
"""Invalid or conflicting function arguments."""
384
385
class InvalidRequestError(Error):
386
"""Invalid operations or runtime state errors."""
387
388
class NoSuchColumnError(Error):
389
"""Nonexistent column requested from RowProxy."""
390
391
class ResourceClosedError(Error):
392
"""Operation on closed connection/cursor/result."""
393
```
394
395
## Usage Examples
396
397
### Basic SQLAlchemy Usage
398
399
```python
400
import asyncio
401
import sqlalchemy as sa
402
import aiomysql.sa
403
404
async def sqlalchemy_basic_example():
405
# Create engine
406
engine = await aiomysql.sa.create_engine(
407
host='localhost',
408
port=3306,
409
user='myuser',
410
password='mypass',
411
db='mydatabase',
412
minsize=1,
413
maxsize=5
414
)
415
416
# Define table metadata
417
metadata = sa.MetaData()
418
users = sa.Table('users', metadata,
419
sa.Column('id', sa.Integer, primary_key=True),
420
sa.Column('name', sa.String(50)),
421
sa.Column('email', sa.String(100)))
422
423
# Execute queries
424
async with engine.acquire() as conn:
425
# Select query
426
query = sa.select([users]).where(users.c.id < 10)
427
result = await conn.execute(query)
428
429
async for row in result:
430
print(f"User: {row.name} ({row.email})")
431
432
await result.close()
433
434
# Cleanup
435
engine.close()
436
await engine.wait_closed()
437
438
asyncio.run(sqlalchemy_basic_example())
439
```
440
441
### Transaction Management
442
443
```python
444
async def transaction_example():
445
engine = await aiomysql.sa.create_engine(
446
host='localhost',
447
user='myuser',
448
password='mypass',
449
db='mydatabase'
450
)
451
452
metadata = sa.MetaData()
453
users = sa.Table('users', metadata,
454
sa.Column('id', sa.Integer, primary_key=True),
455
sa.Column('name', sa.String(50)),
456
sa.Column('balance', sa.Numeric(10, 2)))
457
458
async with engine.acquire() as conn:
459
# Begin transaction
460
async with conn.begin() as trans:
461
try:
462
# Transfer money between users
463
await conn.execute(
464
users.update().where(users.c.id == 1).values(
465
balance=users.c.balance - 100
466
)
467
)
468
await conn.execute(
469
users.update().where(users.c.id == 2).values(
470
balance=users.c.balance + 100
471
)
472
)
473
474
# Transaction commits automatically on success
475
print("Transfer completed successfully")
476
477
except Exception as e:
478
# Transaction rolls back automatically on error
479
print(f"Transfer failed: {e}")
480
raise
481
482
engine.close()
483
await engine.wait_closed()
484
```
485
486
### Nested Transactions (Savepoints)
487
488
```python
489
async def nested_transaction_example():
490
engine = await aiomysql.sa.create_engine(
491
host='localhost',
492
user='myuser',
493
password='mypass',
494
db='mydatabase'
495
)
496
497
metadata = sa.MetaData()
498
orders = sa.Table('orders', metadata,
499
sa.Column('id', sa.Integer, primary_key=True),
500
sa.Column('user_id', sa.Integer),
501
sa.Column('total', sa.Numeric(10, 2)))
502
503
async with engine.acquire() as conn:
504
async with conn.begin() as trans:
505
# Main transaction: create order
506
result = await conn.execute(
507
orders.insert().values(user_id=123, total=250.00)
508
)
509
order_id = result.lastrowid
510
511
# Nested transaction: try to apply discount
512
try:
513
async with conn.begin_nested() as nested_trans:
514
# Try to apply discount
515
await conn.execute(
516
orders.update().where(orders.c.id == order_id).values(
517
total=orders.c.total * 0.9 # 10% discount
518
)
519
)
520
521
# Simulate discount validation failure
522
if order_id % 2 == 0: # Even order IDs fail
523
raise ValueError("Discount not applicable")
524
525
print("Discount applied successfully")
526
527
except ValueError:
528
# Nested transaction rolls back, main continues
529
print("Discount failed, order created without discount")
530
531
print(f"Order {order_id} completed")
532
533
engine.close()
534
await engine.wait_closed()
535
```
536
537
### Complex Queries with Joins
538
539
```python
540
async def complex_query_example():
541
engine = await aiomysql.sa.create_engine(
542
host='localhost',
543
user='myuser',
544
password='mypass',
545
db='mydatabase'
546
)
547
548
metadata = sa.MetaData()
549
users = sa.Table('users', metadata,
550
sa.Column('id', sa.Integer, primary_key=True),
551
sa.Column('name', sa.String(50)),
552
sa.Column('department_id', sa.Integer))
553
554
departments = sa.Table('departments', metadata,
555
sa.Column('id', sa.Integer, primary_key=True),
556
sa.Column('name', sa.String(50)))
557
558
async with engine.acquire() as conn:
559
# Complex join query
560
query = sa.select([
561
users.c.name.label('user_name'),
562
departments.c.name.label('dept_name')
563
]).select_from(
564
users.join(departments, users.c.department_id == departments.c.id)
565
).where(
566
departments.c.name.in_(['Engineering', 'Marketing'])
567
).order_by(users.c.name)
568
569
result = await conn.execute(query)
570
571
print("Users in Engineering and Marketing:")
572
rows = await result.fetchall()
573
for row in rows:
574
print(f"{row.user_name} - {row.dept_name}")
575
576
await result.close()
577
578
engine.close()
579
await engine.wait_closed()
580
```
581
582
### Raw SQL with Parameters
583
584
```python
585
async def raw_sql_example():
586
engine = await aiomysql.sa.create_engine(
587
host='localhost',
588
user='myuser',
589
password='mypass',
590
db='mydatabase'
591
)
592
593
async with engine.acquire() as conn:
594
# Raw SQL with parameters
595
query = sa.text("""
596
SELECT u.name, COUNT(o.id) as order_count
597
FROM users u
598
LEFT JOIN orders o ON u.id = o.user_id
599
WHERE u.created_at >= :start_date
600
GROUP BY u.id, u.name
601
HAVING COUNT(o.id) >= :min_orders
602
ORDER BY order_count DESC
603
""")
604
605
result = await conn.execute(query, {
606
'start_date': '2023-01-01',
607
'min_orders': 5
608
})
609
610
print("Top customers by order count:")
611
async for row in result:
612
print(f"{row.name}: {row.order_count} orders")
613
614
await result.close()
615
616
engine.close()
617
await engine.wait_closed()
618
```