0
# Async Support
1
2
Asynchronous database operations with async engines, connections, sessions, and ORM support for modern async Python applications. SQLAlchemy's async support enables non-blocking database operations in asyncio applications.
3
4
## Capabilities
5
6
### Async Engine Creation
7
8
Create asynchronous database engines for async/await database operations.
9
10
```python { .api }
11
async def create_async_engine(url, **kwargs):
12
"""
13
Create asynchronous database engine.
14
15
Parameters:
16
- url: str or URL, database connection URL with async driver
17
- echo: bool, log all SQL statements
18
- pool_size: int, connection pool size
19
- max_overflow: int, maximum pool overflow
20
- pool_timeout: int, connection timeout in seconds
21
- pool_recycle: int, connection recycle time
22
23
Returns:
24
AsyncEngine: Asynchronous database engine
25
"""
26
27
class AsyncEngine:
28
"""Asynchronous database engine with connection pooling."""
29
30
async def connect(self):
31
"""
32
Create new async database connection.
33
34
Returns:
35
AsyncConnection: New async database connection
36
"""
37
38
async def execute(self, statement, parameters=None):
39
"""
40
Execute statement with automatic connection management.
41
42
Parameters:
43
- statement: str or executable, SQL statement
44
- parameters: dict or sequence, bound parameters
45
46
Returns:
47
Result: Query results
48
"""
49
50
async def begin(self):
51
"""
52
Begin transaction with automatic connection management.
53
54
Returns:
55
AsyncTransaction: Async transaction context manager
56
"""
57
58
async def dispose(self):
59
"""Close all connections and dispose of connection pool."""
60
61
def sync_engine(self):
62
"""
63
Get synchronous engine for metadata operations.
64
65
Returns:
66
Engine: Synchronous engine for DDL operations
67
"""
68
69
@property
70
def dialect(self):
71
"""Database dialect for this engine."""
72
```
73
74
### Async Connection Management
75
76
Asynchronous connection handling with transaction support.
77
78
```python { .api }
79
class AsyncConnection:
80
"""Asynchronous database connection with transaction support."""
81
82
async def execute(self, statement, parameters=None):
83
"""
84
Execute SQL statement on this async connection.
85
86
Parameters:
87
- statement: str or executable, SQL statement
88
- parameters: dict or sequence, bound parameters
89
90
Returns:
91
Result: Query results
92
"""
93
94
async def begin(self):
95
"""
96
Begin transaction on this connection.
97
98
Returns:
99
AsyncTransaction: Async transaction object
100
"""
101
102
async def commit(self):
103
"""Commit current transaction."""
104
105
async def rollback(self):
106
"""Rollback current transaction."""
107
108
async def close(self):
109
"""Close this async connection."""
110
111
async def scalar(self, statement, parameters=None):
112
"""
113
Execute statement and return scalar result.
114
115
Parameters:
116
- statement: str or executable, SQL statement
117
- parameters: dict or sequence, bound parameters
118
119
Returns:
120
Any: Single scalar value
121
"""
122
123
def get_transaction(self):
124
"""
125
Get current transaction for this connection.
126
127
Returns:
128
AsyncTransaction or None: Current transaction
129
"""
130
```
131
132
### Async Transaction Management
133
134
Asynchronous transaction handling with context manager support.
135
136
```python { .api }
137
class AsyncTransaction:
138
"""Asynchronous database transaction with rollback support."""
139
140
async def commit(self):
141
"""Commit this async transaction."""
142
143
async def rollback(self):
144
"""Rollback this async transaction."""
145
146
async def close(self):
147
"""Close transaction (rollback if not committed)."""
148
149
def is_active(self):
150
"""
151
Check if transaction is active.
152
153
Returns:
154
bool: True if transaction is active
155
"""
156
```
157
158
### Async ORM Session
159
160
Asynchronous ORM session with identity map and unit of work patterns.
161
162
```python { .api }
163
class AsyncSession:
164
"""Asynchronous ORM session with identity map and unit of work."""
165
166
def __init__(self, bind, **kwargs):
167
"""
168
Create async ORM session.
169
170
Parameters:
171
- bind: AsyncEngine for database operations
172
- autoflush: bool, auto-flush before queries (default True)
173
- expire_on_commit: bool, expire objects after commit (default True)
174
"""
175
176
def add(self, instance):
177
"""
178
Add object instance to session.
179
180
Parameters:
181
- instance: mapped object to add
182
"""
183
184
def add_all(self, instances):
185
"""
186
Add multiple object instances to session.
187
188
Parameters:
189
- instances: iterable of mapped objects
190
"""
191
192
async def delete(self, instance):
193
"""
194
Mark object instance for deletion.
195
196
Parameters:
197
- instance: mapped object to delete
198
"""
199
200
async def commit(self):
201
"""Flush pending changes and commit transaction."""
202
203
async def rollback(self):
204
"""Rollback current transaction and expire all objects."""
205
206
async def flush(self):
207
"""Flush pending changes to database without committing."""
208
209
def expunge(self, instance):
210
"""
211
Remove instance from session without deleting.
212
213
Parameters:
214
- instance: mapped object to remove from session
215
"""
216
217
def expunge_all(self):
218
"""Remove all instances from session."""
219
220
async def refresh(self, instance, attribute_names=None):
221
"""
222
Refresh object from database.
223
224
Parameters:
225
- instance: mapped object to refresh
226
- attribute_names: specific attributes to refresh
227
"""
228
229
async def merge(self, instance):
230
"""
231
Merge detached instance into session.
232
233
Parameters:
234
- instance: detached mapped object
235
236
Returns:
237
object: Merged persistent instance
238
"""
239
240
async def execute(self, statement, parameters=None, **kwargs):
241
"""
242
Execute statement with ORM-level processing.
243
244
Parameters:
245
- statement: SQL statement or ORM query
246
- parameters: bind parameters
247
248
Returns:
249
Result: Query results
250
"""
251
252
async def scalar(self, statement, parameters=None, **kwargs):
253
"""
254
Execute statement and return scalar result.
255
256
Parameters:
257
- statement: SQL statement or ORM query
258
- parameters: bind parameters
259
260
Returns:
261
Any: Scalar result value
262
"""
263
264
async def get(self, entity, ident):
265
"""
266
Get object by primary key.
267
268
Parameters:
269
- entity: mapped class
270
- ident: primary key value or tuple
271
272
Returns:
273
object or None: Object instance or None if not found
274
"""
275
276
async def stream(self, statement):
277
"""
278
Execute statement and return async result stream.
279
280
Parameters:
281
- statement: SQL statement or ORM query
282
283
Returns:
284
AsyncResult: Streaming query results
285
"""
286
287
async def close(self):
288
"""Close the async session."""
289
290
def async_sessionmaker(bind=None, **kwargs):
291
"""
292
Create AsyncSession factory.
293
294
Parameters:
295
- bind: AsyncEngine for database operations
296
- kwargs: AsyncSession configuration options
297
298
Returns:
299
async_sessionmaker: AsyncSession factory class
300
"""
301
302
def async_scoped_session(session_factory):
303
"""
304
Create async scoped session with context-local storage.
305
306
Parameters:
307
- session_factory: async_sessionmaker instance
308
309
Returns:
310
async_scoped_session: Context-local async session proxy
311
"""
312
```
313
314
### Async Result Processing
315
316
Asynchronous result iteration and processing.
317
318
```python { .api }
319
class AsyncResult:
320
"""Asynchronous query result with async iteration."""
321
322
async def fetchone(self):
323
"""
324
Fetch next row asynchronously.
325
326
Returns:
327
Row or None: Next row or None if no more rows
328
"""
329
330
async def fetchmany(self, size=None):
331
"""
332
Fetch multiple rows asynchronously.
333
334
Parameters:
335
- size: int, number of rows to fetch
336
337
Returns:
338
List[Row]: List of rows
339
"""
340
341
async def fetchall(self):
342
"""
343
Fetch all remaining rows asynchronously.
344
345
Returns:
346
List[Row]: All remaining rows
347
"""
348
349
async def scalar(self):
350
"""
351
Fetch scalar value from first column of first row.
352
353
Returns:
354
Any: Scalar value or None
355
"""
356
357
def mappings(self):
358
"""
359
Return result as async mapping-like objects.
360
361
Returns:
362
AsyncMappingResult: Result with dict-like row access
363
"""
364
365
def scalars(self, index=0):
366
"""
367
Return result as async scalar values.
368
369
Parameters:
370
- index: int, column index for scalar extraction
371
372
Returns:
373
AsyncScalarResult: Result with scalar value iteration
374
"""
375
376
def partitions(self, size=None):
377
"""
378
Partition result into chunks for async processing.
379
380
Parameters:
381
- size: int, partition size
382
383
Returns:
384
AsyncIterator: Async iterator of row partitions
385
"""
386
387
async def __aiter__(self):
388
"""Async iterator support for result rows."""
389
390
async def __anext__(self):
391
"""Get next row in async iteration."""
392
393
class AsyncScalarResult:
394
"""Async result optimized for scalar value iteration."""
395
396
async def all(self):
397
"""
398
Fetch all scalar values.
399
400
Returns:
401
List[Any]: All scalar values
402
"""
403
404
async def first(self):
405
"""
406
Fetch first scalar value.
407
408
Returns:
409
Any or None: First scalar value or None
410
"""
411
412
async def one(self):
413
"""
414
Fetch exactly one scalar value.
415
416
Returns:
417
Any: Single scalar value
418
419
Raises:
420
NoResultFound: If no results
421
MultipleResultsFound: If multiple results
422
"""
423
424
async def one_or_none(self):
425
"""
426
Fetch one scalar value or None.
427
428
Returns:
429
Any or None: Single scalar value or None
430
431
Raises:
432
MultipleResultsFound: If multiple results
433
"""
434
```
435
436
### Async Utilities and Helpers
437
438
Utility functions for async SQLAlchemy operations.
439
440
```python { .api }
441
async def run_sync(fn, *args, **kwargs):
442
"""
443
Run synchronous function in async context.
444
445
Parameters:
446
- fn: synchronous function to run
447
- args: positional arguments
448
- kwargs: keyword arguments
449
450
Returns:
451
Any: Function result
452
"""
453
454
def greenlet_spawn(fn, *args, **kwargs):
455
"""
456
Spawn greenlet for sync operations in async context.
457
458
Parameters:
459
- fn: function to run in greenlet
460
- args: positional arguments
461
- kwargs: keyword arguments
462
463
Returns:
464
Any: Function result
465
"""
466
```
467
468
## Usage Examples
469
470
### Basic Async Engine Usage
471
472
```python
473
import asyncio
474
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
475
from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped
476
from sqlalchemy import String, Integer, select
477
478
# Define models
479
class Base(DeclarativeBase):
480
pass
481
482
class User(Base):
483
__tablename__ = 'users'
484
485
id: Mapped[int] = mapped_column(primary_key=True)
486
name: Mapped[str] = mapped_column(String(50))
487
email: Mapped[str] = mapped_column(String(100))
488
489
async def main():
490
# Create async engine (note: driver must support async)
491
engine = create_async_engine(
492
"postgresql+asyncpg://user:pass@localhost/dbname",
493
echo=True
494
)
495
496
# Create tables (using sync engine)
497
async with engine.begin() as conn:
498
await conn.run_sync(Base.metadata.create_all)
499
500
# Use async session
501
async with AsyncSession(engine) as session:
502
# Add new user
503
new_user = User(name="Alice", email="alice@example.com")
504
session.add(new_user)
505
await session.commit()
506
507
# Query users
508
stmt = select(User).where(User.name.like('%Alice%'))
509
result = await session.execute(stmt)
510
users = result.scalars().all()
511
512
for user in users:
513
print(f"User: {user.name}, Email: {user.email}")
514
515
await engine.dispose()
516
517
# Run async function
518
asyncio.run(main())
519
```
520
521
### Async Connection Context Management
522
523
```python
524
async def database_operations():
525
engine = create_async_engine("sqlite+aiosqlite:///async_example.db")
526
527
# Direct connection usage
528
async with engine.connect() as conn:
529
# Execute raw SQL
530
result = await conn.execute(text("SELECT 1"))
531
value = result.scalar()
532
print(f"Result: {value}")
533
534
# Transaction management
535
async with conn.begin():
536
await conn.execute(
537
users.insert().values(name="Bob", email="bob@example.com")
538
)
539
# Automatically committed
540
541
await engine.dispose()
542
```
543
544
### Streaming Results
545
546
```python
547
async def stream_large_dataset():
548
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
549
550
async with AsyncSession(engine) as session:
551
# Stream results for memory efficiency
552
stmt = select(User).where(User.active == True)
553
stream = await session.stream(stmt)
554
555
async for user in stream.scalars():
556
print(f"Processing user: {user.name}")
557
# Process user without loading all into memory
558
559
await engine.dispose()
560
```
561
562
### Async Session Factory
563
564
```python
565
from sqlalchemy.ext.asyncio import async_sessionmaker
566
567
# Create reusable session factory
568
async_session = async_sessionmaker(
569
create_async_engine("postgresql+asyncpg://user:pass@localhost/db"),
570
expire_on_commit=False
571
)
572
573
async def get_user_by_id(user_id: int):
574
async with async_session() as session:
575
return await session.get(User, user_id)
576
577
async def create_user(name: str, email: str):
578
async with async_session() as session:
579
user = User(name=name, email=email)
580
session.add(user)
581
await session.commit()
582
return user
583
```
584
585
### Mixing Sync and Async Operations
586
587
```python
588
async def mixed_operations():
589
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
590
591
# Some operations need sync engine (like metadata operations)
592
sync_engine = engine.sync_engine
593
594
# Create tables with sync engine
595
Base.metadata.create_all(sync_engine)
596
597
# Use async engine for data operations
598
async with AsyncSession(engine) as session:
599
stmt = select(User).limit(10)
600
result = await session.execute(stmt)
601
users = result.scalars().all()
602
603
for user in users:
604
print(f"User: {user.name}")
605
606
await engine.dispose()
607
```
608
609
### Error Handling in Async Context
610
611
```python
612
from sqlalchemy.exc import IntegrityError, NoResultFound
613
614
async def safe_user_operations():
615
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
616
617
async with AsyncSession(engine) as session:
618
try:
619
# Attempt to create user with duplicate email
620
user = User(name="Test", email="existing@example.com")
621
session.add(user)
622
await session.commit()
623
except IntegrityError:
624
await session.rollback()
625
print("User with this email already exists")
626
627
try:
628
# Attempt to get non-existent user
629
stmt = select(User).where(User.id == 99999)
630
result = await session.execute(stmt)
631
user = result.scalar_one() # Raises if not found
632
except NoResultFound:
633
print("User not found")
634
635
await engine.dispose()
636
```
637
638
### Async Database Driver Requirements
639
640
```python
641
# Different async drivers for different databases:
642
643
# PostgreSQL with asyncpg
644
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
645
646
# MySQL with aiomysql
647
engine = create_async_engine("mysql+aiomysql://user:pass@host/db")
648
649
# SQLite with aiosqlite
650
engine = create_async_engine("sqlite+aiosqlite:///path/to/database.db")
651
652
# Note: The underlying DBAPI driver must support async operations
653
# Standard drivers like psycopg2, PyMySQL, sqlite3 do NOT work with async engines
654
```