0
# Asynchronous Operations
1
2
Perform database operations asynchronously using asyncio with full async/await support, providing non-blocking database access for high-concurrency applications.
3
4
## Async Connection Management
5
6
### Async Connection Functions
7
8
```python { .api }
9
import mysql.connector.aio
10
11
async def connect(**kwargs) -> 'MySQLConnection':
12
"""
13
Create async connection to MySQL server.
14
15
Returns:
16
MySQLConnection instance for async operations
17
"""
18
pass
19
```
20
21
### MySQLConnection (Async)
22
23
```python { .api }
24
class MySQLConnection:
25
"""
26
Async connection class with asyncio support.
27
Provides non-blocking database operations using async/await.
28
"""
29
30
def __init__(self, **kwargs) -> None:
31
"""Initialize async connection with configuration."""
32
pass
33
34
async def connect(self) -> None:
35
"""Establish async connection to MySQL server."""
36
pass
37
38
async def disconnect(self) -> None:
39
"""Close async connection to MySQL server."""
40
pass
41
42
async def close(self) -> None:
43
"""Close connection (alias for disconnect)."""
44
pass
45
46
def is_connected(self) -> bool:
47
"""Check if connection is active (non-blocking check)."""
48
pass
49
50
async def ping(self, reconnect: bool = False, attempts: int = 1, delay: int = 0) -> None:
51
"""Test connection to server asynchronously."""
52
pass
53
54
async def reconnect(self, attempts: int = 1, delay: int = 0) -> None:
55
"""Reconnect to MySQL server asynchronously."""
56
pass
57
58
def cursor(self,
59
buffered: Optional[bool] = None,
60
raw: Optional[bool] = None,
61
prepared: Optional[bool] = None,
62
cursor_class: Optional[Type] = None,
63
dictionary: Optional[bool] = None) -> 'MySQLCursor':
64
"""Create async cursor for executing SQL statements."""
65
pass
66
67
async def commit(self) -> None:
68
"""Commit current transaction asynchronously."""
69
pass
70
71
async def rollback(self) -> None:
72
"""Rollback current transaction asynchronously."""
73
pass
74
75
async def start_transaction(self,
76
consistent_snapshot: bool = False,
77
isolation_level: Optional[str] = None,
78
readonly: Optional[bool] = None) -> None:
79
"""Start new transaction asynchronously."""
80
pass
81
82
@property
83
def autocommit(self) -> bool:
84
"""Get autocommit mode status."""
85
pass
86
87
async def set_autocommit(self, value: bool) -> None:
88
"""Set autocommit mode asynchronously."""
89
pass
90
91
@property
92
def database(self) -> str:
93
"""Get current database name."""
94
pass
95
96
async def set_database(self, value: str) -> None:
97
"""Change current database asynchronously."""
98
pass
99
100
@property
101
def server_version(self) -> Tuple[int, int, int]:
102
"""Get MySQL server version tuple."""
103
pass
104
105
@property
106
def connection_id(self) -> int:
107
"""Get MySQL connection ID."""
108
pass
109
110
@property
111
def charset(self) -> str:
112
"""Get connection character set."""
113
pass
114
115
async def set_charset(self, value: str) -> None:
116
"""Set connection character set asynchronously."""
117
pass
118
119
async def cmd_query(self, query: Union[str, bytes]) -> Dict:
120
"""Execute query asynchronously and return raw result."""
121
pass
122
123
async def cmd_quit(self) -> bytes:
124
"""Send quit command to server asynchronously."""
125
pass
126
127
async def cmd_init_db(self, database: str) -> bytes:
128
"""Send init_db command to change database asynchronously."""
129
pass
130
131
async def cmd_refresh(self, options: int) -> bytes:
132
"""Send refresh command asynchronously."""
133
pass
134
135
async def cmd_statistics(self) -> Dict:
136
"""Get server statistics asynchronously."""
137
pass
138
139
async def cmd_ping(self) -> bytes:
140
"""Send ping command to server asynchronously."""
141
pass
142
143
async def reset_session(self,
144
user_variables: Optional[Dict] = None,
145
session_variables: Optional[Dict] = None) -> None:
146
"""Reset session to initial state asynchronously."""
147
pass
148
149
async def get_warnings(self, count: Optional[int] = None) -> List[Tuple]:
150
"""Get warning messages from last statement asynchronously."""
151
pass
152
153
@property
154
def warning_count(self) -> int:
155
"""Get warning count from last statement."""
156
pass
157
158
@property
159
def info_msg(self) -> Optional[str]:
160
"""Get info message from last statement."""
161
pass
162
163
@property
164
def insert_id(self) -> int:
165
"""Get auto-generated ID from last INSERT."""
166
pass
167
168
@property
169
def affected_rows(self) -> int:
170
"""Get affected row count from last statement."""
171
pass
172
173
@property
174
def in_transaction(self) -> bool:
175
"""Check if connection is in transaction."""
176
pass
177
178
async def __aenter__(self) -> 'MySQLConnection':
179
"""Async context manager entry."""
180
pass
181
182
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
183
"""Async context manager exit with automatic cleanup."""
184
pass
185
```
186
187
### MySQLConnectionAbstract
188
189
```python { .api }
190
class MySQLConnectionAbstract:
191
"""
192
Abstract base class for async connections.
193
Defines interface for async connection implementations.
194
"""
195
pass
196
```
197
198
## Async Cursor Operations
199
200
### MySQLCursor (Async)
201
202
```python { .api }
203
class MySQLCursor:
204
"""
205
Async cursor for executing SQL statements.
206
Provides non-blocking query execution and result fetching.
207
"""
208
209
async def execute(self, operation: str, params: Optional[Union[Sequence, Dict]] = None, multi: bool = False) -> Optional[AsyncIterator]:
210
"""Execute SQL statement asynchronously with optional parameters."""
211
pass
212
213
async def executemany(self, operation: str, seq_params: Sequence[Union[Sequence, Dict]]) -> None:
214
"""Execute SQL statement multiple times asynchronously."""
215
pass
216
217
async def fetchone(self) -> Optional[Tuple]:
218
"""Fetch next row from result set asynchronously."""
219
pass
220
221
async def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
222
"""Fetch specified number of rows asynchronously."""
223
pass
224
225
async def fetchall(self) -> List[Tuple]:
226
"""Fetch all remaining rows asynchronously."""
227
pass
228
229
async def close(self) -> None:
230
"""Close cursor and free resources asynchronously."""
231
pass
232
233
async def callproc(self, procname: str, args: Sequence = ()) -> Optional[Dict]:
234
"""Call stored procedure asynchronously."""
235
pass
236
237
def stored_results(self) -> AsyncIterator['MySQLCursor']:
238
"""Return async iterator for stored procedure result sets."""
239
pass
240
241
async def nextset(self) -> Optional[bool]:
242
"""Skip to next result set asynchronously."""
243
pass
244
245
@property
246
def description(self) -> Optional[List[Tuple]]:
247
"""Column metadata for last executed query."""
248
pass
249
250
@property
251
def rowcount(self) -> int:
252
"""Number of rows affected by last operation."""
253
pass
254
255
@property
256
def lastrowid(self) -> Optional[int]:
257
"""Auto-generated ID from last INSERT operation."""
258
pass
259
260
@property
261
def arraysize(self) -> int:
262
"""Default number of rows fetchmany() should return."""
263
pass
264
265
@arraysize.setter
266
def arraysize(self, value: int) -> None:
267
"""Set default fetchmany() size."""
268
pass
269
270
@property
271
def statement(self) -> Optional[str]:
272
"""Last executed SQL statement."""
273
pass
274
275
@property
276
def with_rows(self) -> bool:
277
"""Whether last operation produced result rows."""
278
pass
279
280
@property
281
def column_names(self) -> Tuple[str, ...]:
282
"""Column names from result set."""
283
pass
284
285
def __aiter__(self) -> 'MySQLCursor':
286
"""Make cursor async iterable over result rows."""
287
pass
288
289
async def __anext__(self) -> Tuple:
290
"""Get next row for async iteration."""
291
pass
292
293
async def __aenter__(self) -> 'MySQLCursor':
294
"""Async context manager entry."""
295
pass
296
297
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
298
"""Async context manager exit with automatic cleanup."""
299
pass
300
```
301
302
## Async Connection Pooling
303
304
### MySQLConnectionPool (Async)
305
306
```python { .api }
307
class MySQLConnectionPool:
308
"""
309
Async connection pool manager.
310
Manages pool of async database connections for efficient reuse.
311
"""
312
313
def __init__(self,
314
pool_name: Optional[str] = None,
315
pool_size: int = 5,
316
pool_reset_session: bool = True,
317
**kwargs) -> None:
318
"""Initialize async connection pool."""
319
pass
320
321
async def get_connection(self) -> 'PooledMySQLConnection':
322
"""Get connection from pool asynchronously."""
323
pass
324
325
async def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':
326
"""Add connection to pool asynchronously."""
327
pass
328
329
def set_config(self, **kwargs) -> None:
330
"""Update pool configuration."""
331
pass
332
333
async def close(self) -> None:
334
"""Close all connections in pool asynchronously."""
335
pass
336
337
@property
338
def pool_name(self) -> str:
339
"""Pool name identifier."""
340
pass
341
342
@property
343
def pool_size(self) -> int:
344
"""Maximum pool size."""
345
pass
346
```
347
348
### PooledMySQLConnection (Async)
349
350
```python { .api }
351
class PooledMySQLConnection:
352
"""
353
Async pooled connection wrapper.
354
Returns connection to pool on close in async context.
355
"""
356
357
def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:
358
"""Initialize async pooled connection wrapper."""
359
pass
360
361
async def close(self) -> None:
362
"""Return connection to pool asynchronously."""
363
pass
364
365
@property
366
def pool_name(self) -> str:
367
"""Name of the connection pool."""
368
pass
369
370
async def __aenter__(self) -> 'PooledMySQLConnection':
371
"""Async context manager entry."""
372
pass
373
374
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
375
"""Async context manager exit returning connection to pool."""
376
pass
377
```
378
379
### Async Pooling Functions
380
381
```python { .api }
382
async def connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:
383
"""
384
Create async database connection with optional pooling.
385
386
When pool parameters are provided, returns PooledMySQLConnection.
387
Otherwise returns MySQLConnection.
388
"""
389
pass
390
```
391
392
## Usage Examples
393
394
### Basic Async Connection
395
396
```python
397
import asyncio
398
import mysql.connector.aio
399
400
async def main():
401
# Create async connection
402
connection = await mysql.connector.aio.connect(
403
host='localhost',
404
user='myuser',
405
password='mypassword',
406
database='mydatabase'
407
)
408
409
# Create async cursor
410
cursor = connection.cursor()
411
412
# Execute query asynchronously
413
await cursor.execute("SELECT id, name FROM users WHERE age > %s", (25,))
414
415
# Fetch results asynchronously
416
async for (user_id, name) in cursor:
417
print(f"User {user_id}: {name}")
418
419
# Cleanup
420
await cursor.close()
421
await connection.close()
422
423
# Run async function
424
asyncio.run(main())
425
```
426
427
### Async Context Managers
428
429
```python
430
import asyncio
431
import mysql.connector.aio
432
433
async def main():
434
# Automatic async connection cleanup
435
async with mysql.connector.aio.connect(
436
host='localhost',
437
user='myuser',
438
password='mypassword',
439
database='mydatabase'
440
) as connection:
441
442
# Automatic async cursor cleanup
443
async with connection.cursor(dictionary=True) as cursor:
444
await cursor.execute("SELECT COUNT(*) as total FROM users")
445
result = await cursor.fetchone()
446
print(f"Total users: {result['total']}")
447
# Cursor automatically closed
448
# Connection automatically closed
449
450
asyncio.run(main())
451
```
452
453
### Async Transaction Management
454
455
```python
456
import asyncio
457
import mysql.connector.aio
458
459
async def transfer_funds(from_account: int, to_account: int, amount: float):
460
async with mysql.connector.aio.connect(
461
host='localhost',
462
user='myuser',
463
password='mypassword',
464
database='mydatabase'
465
) as connection:
466
467
try:
468
# Start transaction
469
await connection.start_transaction()
470
471
async with connection.cursor() as cursor:
472
# Debit from account
473
await cursor.execute(
474
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
475
(amount, from_account)
476
)
477
478
# Credit to account
479
await cursor.execute(
480
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
481
(amount, to_account)
482
)
483
484
# Check if both operations affected rows
485
if cursor.rowcount == 0:
486
raise ValueError("Account not found")
487
488
# Commit transaction
489
await connection.commit()
490
print(f"Transferred {amount} from {from_account} to {to_account}")
491
492
except Exception as err:
493
# Rollback on error
494
await connection.rollback()
495
print(f"Transfer failed: {err}")
496
raise
497
498
asyncio.run(transfer_funds(1, 2, 100.0))
499
```
500
501
### Async Connection Pooling
502
503
```python
504
import asyncio
505
import mysql.connector.aio
506
507
async def worker_task(worker_id: int):
508
"""Async worker using pooled connection."""
509
# Get connection from async pool
510
async with mysql.connector.aio.connect(
511
host='localhost',
512
user='myuser',
513
password='mypassword',
514
database='mydatabase',
515
pool_name='async_pool',
516
pool_size=5
517
) as connection:
518
519
async with connection.cursor() as cursor:
520
await cursor.execute("SELECT SLEEP(%s)", (1,))
521
await cursor.fetchone()
522
523
print(f"Async worker {worker_id} completed")
524
525
async def main():
526
# Create multiple async tasks
527
tasks = [worker_task(i) for i in range(10)]
528
529
# Run tasks concurrently
530
await asyncio.gather(*tasks)
531
print("All async workers completed")
532
533
asyncio.run(main())
534
```
535
536
### Async Batch Processing
537
538
```python
539
import asyncio
540
import mysql.connector.aio
541
542
async def process_batch(batch_data: List[Dict]):
543
"""Process batch of data asynchronously."""
544
async with mysql.connector.aio.connect(
545
host='localhost',
546
user='myuser',
547
password='mypassword',
548
database='mydatabase'
549
) as connection:
550
551
async with connection.cursor() as cursor:
552
# Prepare batch insert
553
insert_query = "INSERT INTO processed_data (id, value, timestamp) VALUES (%s, %s, NOW())"
554
555
# Execute batch asynchronously
556
for item in batch_data:
557
await cursor.execute(insert_query, (item['id'], item['value']))
558
559
await connection.commit()
560
print(f"Processed batch of {len(batch_data)} items")
561
562
async def main():
563
# Sample data batches
564
batches = [
565
[{'id': 1, 'value': 'A'}, {'id': 2, 'value': 'B'}],
566
[{'id': 3, 'value': 'C'}, {'id': 4, 'value': 'D'}],
567
[{'id': 5, 'value': 'E'}, {'id': 6, 'value': 'F'}]
568
]
569
570
# Process all batches concurrently
571
tasks = [process_batch(batch) for batch in batches]
572
await asyncio.gather(*tasks)
573
print("All batches processed")
574
575
asyncio.run(main())
576
```
577
578
### Async Result Streaming
579
580
```python
581
import asyncio
582
import mysql.connector.aio
583
584
async def stream_large_dataset():
585
"""Stream large dataset asynchronously."""
586
async with mysql.connector.aio.connect(
587
host='localhost',
588
user='myuser',
589
password='mypassword',
590
database='mydatabase'
591
) as connection:
592
593
async with connection.cursor() as cursor:
594
await cursor.execute("SELECT * FROM large_table ORDER BY id")
595
596
# Stream results asynchronously
597
count = 0
598
async for row in cursor:
599
# Process each row as it arrives
600
print(f"Processing row {count}: {row[0]}")
601
count += 1
602
603
# Yield control to other tasks periodically
604
if count % 1000 == 0:
605
await asyncio.sleep(0) # Yield control
606
607
print(f"Streamed {count} rows")
608
609
asyncio.run(stream_large_dataset())
610
```
611
612
### Async with Multiple Databases
613
614
```python
615
import asyncio
616
import mysql.connector.aio
617
618
async def sync_data_between_databases():
619
"""Sync data between two databases asynchronously."""
620
621
# Connect to source database
622
source_conn = await mysql.connector.aio.connect(
623
host='source.mysql.example.com',
624
user='myuser',
625
password='mypassword',
626
database='source_db'
627
)
628
629
# Connect to destination database
630
dest_conn = await mysql.connector.aio.connect(
631
host='dest.mysql.example.com',
632
user='myuser',
633
password='mypassword',
634
database='dest_db'
635
)
636
637
try:
638
# Get cursors for both connections
639
source_cursor = source_conn.cursor(dictionary=True)
640
dest_cursor = dest_conn.cursor()
641
642
# Read from source
643
await source_cursor.execute("SELECT * FROM users WHERE updated_at > %s", ('2024-01-01',))
644
645
# Process and insert to destination
646
async for user in source_cursor:
647
await dest_cursor.execute(
648
"INSERT INTO users (id, name, email) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE name=%s, email=%s",
649
(user['id'], user['name'], user['email'], user['name'], user['email'])
650
)
651
652
# Commit destination changes
653
await dest_conn.commit()
654
print("Data sync completed")
655
656
finally:
657
# Cleanup both connections
658
await source_cursor.close()
659
await dest_cursor.close()
660
await source_conn.close()
661
await dest_conn.close()
662
663
asyncio.run(sync_data_between_databases())
664
```
665
666
### Async with Timeout and Error Handling
667
668
```python
669
import asyncio
670
import mysql.connector.aio
671
672
async def query_with_timeout():
673
"""Execute query with timeout and error handling."""
674
try:
675
# Set timeout for entire operation
676
async with asyncio.timeout(30): # 30 second timeout
677
async with mysql.connector.aio.connect(
678
host='localhost',
679
user='myuser',
680
password='mypassword',
681
database='mydatabase',
682
connect_timeout=10 # Connection timeout
683
) as connection:
684
685
async with connection.cursor() as cursor:
686
# Long-running query
687
await cursor.execute("SELECT * FROM large_table WHERE complex_condition = %s", ('value',))
688
689
results = await cursor.fetchall()
690
print(f"Query returned {len(results)} rows")
691
692
except asyncio.TimeoutError:
693
print("Query timed out")
694
except mysql.connector.Error as err:
695
print(f"Database error: {err}")
696
except Exception as err:
697
print(f"Unexpected error: {err}")
698
699
asyncio.run(query_with_timeout())
700
```