0
# Asynchronous Operations
1
2
Asyncio-compatible database operations including async connections, cursors, and connection pooling for high-performance concurrent database access.
3
4
## Capabilities
5
6
### Async Connection Creation
7
8
Create asynchronous database connections that integrate with Python's asyncio event loop for non-blocking database operations.
9
10
```python { .api }
11
async def connect(host="localhost", user=None, passwd="", db=None, port=3306,
12
unix_socket=None, charset='', sql_mode=None, read_default_file=None,
13
client_flag=0, cursorclass=None, init_command=None, connect_timeout=None,
14
ssl=None, read_default_group=None, compress="", zstd_compression_level=3,
15
named_pipe=None, conv=None, encoders=None, loop=None):
16
"""
17
Create an asynchronous database connection.
18
19
Parameters:
20
- host (str): MySQL server hostname or IP address (default: "localhost")
21
- user (str): Username for authentication
22
- passwd (str): Password for authentication (default: "")
23
- port (int): MySQL server port number (default: 3306)
24
- db (str): Default database name
25
- unix_socket (str): Unix socket path for local connections
26
- charset (str): Character set for connection (default: '')
27
- sql_mode (str): SQL mode setting for connection
28
- read_default_file (str): MySQL configuration file path
29
- client_flag (int): Custom flags to send to MySQL
30
- cursorclass: Default cursor class for this connection
31
- init_command (str): SQL command to run on connection
32
- connect_timeout (int): Connection timeout in seconds
33
- ssl (dict): SSL configuration parameters
34
- read_default_group (str): Group to read from configuration file
35
- compress (str): Compression algorithm ("zlib" or "zstd")
36
- zstd_compression_level (int): ZSTD compression level (1-22, default: 3)
37
- named_pipe: Not supported (raises NotImplementedError)
38
- conv: Decoders dictionary for custom type marshalling
39
- encoders: Encoders dictionary for custom type marshalling
40
- loop: Event loop to use (default: current event loop)
41
42
Returns:
43
AsyncConnection: Asynchronous database connection object
44
45
Raises:
46
OperationalError: Connection failed
47
InterfaceError: Invalid connection parameters
48
"""
49
```
50
51
### Async Connection Management
52
53
The AsyncConnection class provides asynchronous methods for managing database connections and executing operations without blocking the event loop.
54
55
```python { .api }
56
class AsyncConnection:
57
async def cursor(self, cursor=None):
58
"""
59
Create a new async cursor object.
60
61
Parameters:
62
- cursor: Async cursor class to instantiate (optional)
63
64
Returns:
65
AsyncCursor: New async cursor object
66
"""
67
68
async def commit(self):
69
"""
70
Commit current transaction asynchronously.
71
72
Raises:
73
OperationalError: Transaction commit failed
74
"""
75
76
async def rollback(self):
77
"""
78
Roll back current transaction asynchronously.
79
80
Raises:
81
OperationalError: Transaction rollback failed
82
"""
83
84
def close(self):
85
"""
86
Close the database connection.
87
"""
88
89
async def ensure_closed(self):
90
"""
91
Ensure connection is properly closed and cleaned up.
92
"""
93
94
async def autocommit(self, value):
95
"""
96
Enable or disable autocommit mode asynchronously.
97
98
Parameters:
99
- value (bool): True to enable autocommit, False to disable
100
"""
101
102
async def ping(self):
103
"""
104
Check if connection to server is alive asynchronously.
105
106
Raises:
107
OperationalError: Connection check failed
108
"""
109
110
async def set_charset(self, charset):
111
"""
112
Set connection character set asynchronously.
113
114
Parameters:
115
- charset (str): Character set name
116
"""
117
```
118
119
### Async Cursor Operations
120
121
Asynchronous cursor providing non-blocking SQL execution and result retrieval.
122
123
```python { .api }
124
class AsyncCursor:
125
async def execute(self, query, args=None):
126
"""
127
Execute a SQL statement asynchronously.
128
129
Parameters:
130
- query (str): SQL statement with optional %s placeholders
131
- args (tuple/list/dict): Parameters to bind to placeholders
132
133
Returns:
134
int: Number of affected rows
135
136
Raises:
137
ProgrammingError: SQL syntax error
138
OperationalError: Database operation error
139
"""
140
141
async def executemany(self, query, args_list):
142
"""
143
Execute a SQL statement multiple times asynchronously.
144
145
Parameters:
146
- query (str): SQL statement with %s placeholders
147
- args_list (list/tuple): Sequence of parameter tuples/lists
148
149
Returns:
150
int: Number of affected rows from all executions
151
"""
152
153
async def fetchone(self):
154
"""
155
Fetch the next row asynchronously.
156
157
Returns:
158
tuple: Next row data, or None if no more rows
159
"""
160
161
async def fetchmany(self, size=None):
162
"""
163
Fetch multiple rows asynchronously.
164
165
Parameters:
166
- size (int): Number of rows to fetch (default: arraysize)
167
168
Returns:
169
list: List of row tuples
170
"""
171
172
async def fetchall(self):
173
"""
174
Fetch all remaining rows asynchronously.
175
176
Returns:
177
list: List of all remaining row tuples
178
"""
179
180
def close(self):
181
"""
182
Close the cursor and free resources.
183
"""
184
185
async def __aenter__(self):
186
"""Async context manager entry."""
187
188
async def __aexit__(self, exc_type, exc_val, exc_tb):
189
"""Async context manager exit."""
190
```
191
192
### Dictionary Cursor
193
194
Async cursor that returns rows as dictionaries instead of tuples.
195
196
```python { .api }
197
class AsyncDictCursor(AsyncCursor):
198
"""
199
Async cursor that returns rows as dictionaries with column names as keys.
200
201
Each row is returned as a dictionary mapping column names to values,
202
making it easier to access specific columns by name in async operations.
203
"""
204
205
async def execute(self, query, args=None):
206
"""Execute query and prepare field mapping for dictionary results."""
207
208
async def fetchone(self):
209
"""
210
Fetch the next row as a dictionary asynchronously.
211
212
Returns:
213
dict: Row data as {column_name: value} mapping, or None if no more rows
214
"""
215
216
async def fetchmany(self, size=None):
217
"""
218
Fetch multiple rows as dictionaries asynchronously.
219
220
Parameters:
221
- size (int): Number of rows to fetch (default: arraysize)
222
223
Returns:
224
tuple: Tuple of dictionaries representing rows
225
"""
226
227
async def fetchall(self):
228
"""
229
Fetch all remaining rows as dictionaries asynchronously.
230
231
Returns:
232
tuple: Tuple of dictionaries representing all remaining rows
233
"""
234
```
235
236
### Connection Pooling
237
238
Efficient connection pooling for high-throughput async applications with automatic connection lifecycle management.
239
240
```python { .api }
241
def create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
242
"""
243
Create an async connection pool context manager.
244
245
Parameters:
246
- minsize (int): Minimum number of connections in pool
247
- maxsize (int): Maximum number of connections in pool
248
- pool_recycle (int): Connection recycle time in seconds (-1 = no recycle)
249
- loop: Event loop to use (default: current event loop)
250
- **kwargs: Connection parameters for pool connections
251
252
Returns:
253
PoolContextManager: Context manager for connection pool
254
"""
255
256
async def _create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):
257
"""
258
Create an async connection pool directly.
259
260
Returns:
261
Pool: Connection pool object
262
"""
263
264
class Pool:
265
"""
266
Async connection pool managing multiple database connections.
267
"""
268
269
def acquire(self):
270
"""
271
Get a connection from the pool.
272
273
Returns:
274
PoolAcquireContextManager: Context manager for pool connection
275
"""
276
277
async def _acquire(self):
278
"""
279
Acquire a connection from the pool directly.
280
281
Returns:
282
AsyncConnection: Database connection from pool
283
"""
284
285
def release(self, conn):
286
"""
287
Return a connection to the pool.
288
289
Parameters:
290
- conn: Connection to return to pool
291
"""
292
293
def close(self):
294
"""
295
Close the pool and all connections.
296
"""
297
298
async def wait_closed(self):
299
"""
300
Wait for pool closure to complete.
301
"""
302
303
@property
304
def size(self):
305
"""Current number of connections in pool."""
306
307
@property
308
def freesize(self):
309
"""Number of free connections in pool."""
310
```
311
312
## Usage Examples
313
314
### Basic Async Connection
315
316
```python
317
import asyncio
318
import cymysql.aio
319
320
async def basic_query():
321
conn = await cymysql.aio.connect(
322
host='localhost',
323
user='root',
324
password='password',
325
db='testdb'
326
)
327
328
cursor = await conn.cursor()
329
await cursor.execute("SELECT COUNT(*) FROM users")
330
331
result = await cursor.fetchone()
332
print(f"User count: {result[0]}")
333
334
cursor.close()
335
conn.close()
336
337
asyncio.run(basic_query())
338
```
339
340
### Async Context Managers
341
342
```python
343
import asyncio
344
import cymysql.aio
345
346
async def context_manager_example():
347
conn = await cymysql.aio.connect(
348
host='localhost',
349
user='root',
350
password='password',
351
db='testdb'
352
)
353
354
# Cursor context manager
355
async with conn.cursor() as cursor:
356
await cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))
357
358
async for row in cursor:
359
print(f"User: {row[1]} (ID: {row[0]})")
360
361
conn.close()
362
363
asyncio.run(context_manager_example())
364
```
365
366
### Connection Pool Usage
367
368
```python
369
import asyncio
370
import cymysql.aio
371
372
async def pool_example():
373
# Create connection pool
374
pool = await cymysql.aio.create_pool(
375
host='localhost',
376
user='root',
377
password='password',
378
db='testdb',
379
minsize=1,
380
maxsize=10
381
)
382
383
# Use pool connection
384
async with pool.acquire() as conn:
385
async with conn.cursor() as cursor:
386
await cursor.execute("SELECT VERSION()")
387
version = await cursor.fetchone()
388
print(f"MySQL version: {version[0]}")
389
390
# Clean up pool
391
pool.close()
392
await pool.wait_closed()
393
394
asyncio.run(pool_example())
395
```
396
397
### Concurrent Database Operations
398
399
```python
400
import asyncio
401
import cymysql.aio
402
403
async def fetch_user_data(pool, user_id):
404
async with pool.acquire() as conn:
405
async with conn.cursor(cymysql.aio.AsyncDictCursor) as cursor:
406
await cursor.execute(
407
"SELECT id, name, email FROM users WHERE id = %s",
408
(user_id,)
409
)
410
return await cursor.fetchone()
411
412
async def concurrent_example():
413
pool = await cymysql.aio.create_pool(
414
host='localhost',
415
user='root',
416
password='password',
417
db='testdb',
418
minsize=5,
419
maxsize=20
420
)
421
422
# Fetch multiple users concurrently
423
user_ids = [1, 2, 3, 4, 5]
424
tasks = [fetch_user_data(pool, uid) for uid in user_ids]
425
426
users = await asyncio.gather(*tasks)
427
428
for user in users:
429
if user:
430
print(f"User: {user['name']} ({user['email']})")
431
432
pool.close()
433
await pool.wait_closed()
434
435
asyncio.run(concurrent_example())
436
```
437
438
### Async Transaction Management
439
440
```python
441
import asyncio
442
import cymysql.aio
443
444
async def transfer_funds(pool, from_account, to_account, amount):
445
async with pool.acquire() as conn:
446
try:
447
# Disable autocommit for transaction
448
await conn.autocommit(False)
449
450
async with conn.cursor() as cursor:
451
# Debit from source account
452
await cursor.execute(
453
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
454
(amount, from_account)
455
)
456
457
# Credit to destination account
458
await cursor.execute(
459
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
460
(amount, to_account)
461
)
462
463
# Commit transaction
464
await conn.commit()
465
print(f"Transfer of ${amount} completed successfully")
466
467
except Exception as e:
468
# Rollback on error
469
await conn.rollback()
470
print(f"Transfer failed: {e}")
471
finally:
472
# Re-enable autocommit
473
await conn.autocommit(True)
474
475
async def transaction_example():
476
pool = await cymysql.aio.create_pool(
477
host='localhost',
478
user='root',
479
password='password',
480
db='banking'
481
)
482
483
await transfer_funds(pool, from_account=1, to_account=2, amount=100.00)
484
485
pool.close()
486
await pool.wait_closed()
487
488
asyncio.run(transaction_example())
489
```
490
491
### Streaming Large Result Sets
492
493
```python
494
import asyncio
495
import cymysql.aio
496
497
async def process_large_dataset(pool):
498
async with pool.acquire() as conn:
499
async with conn.cursor() as cursor:
500
await cursor.execute("SELECT * FROM large_table")
501
502
# Process results in batches to avoid memory issues
503
while True:
504
rows = await cursor.fetchmany(1000)
505
if not rows:
506
break
507
508
# Process batch
509
for row in rows:
510
await process_row_async(row)
511
512
print(f"Processed batch of {len(rows)} rows")
513
514
async def process_row_async(row):
515
# Simulate async processing
516
await asyncio.sleep(0.001)
517
518
asyncio.run(process_large_dataset(pool))
519
```
520
521
### Error Handling in Async Operations
522
523
```python
524
import asyncio
525
import cymysql.aio
526
from cymysql import OperationalError, ProgrammingError
527
528
async def robust_async_operation():
529
pool = None
530
try:
531
pool = await cymysql.aio.create_pool(
532
host='localhost',
533
user='root',
534
password='password',
535
db='testdb',
536
minsize=1,
537
maxsize=5
538
)
539
540
async with pool.acquire() as conn:
541
async with conn.cursor() as cursor:
542
await cursor.execute("SELECT COUNT(*) FROM nonexistent_table")
543
result = await cursor.fetchone()
544
545
except OperationalError as e:
546
print(f"Database operation error: {e}")
547
except ProgrammingError as e:
548
print(f"SQL programming error: {e}")
549
except Exception as e:
550
print(f"Unexpected error: {e}")
551
finally:
552
if pool:
553
pool.close()
554
await pool.wait_closed()
555
556
asyncio.run(robust_async_operation())
557
```
558
559
## Performance Best Practices
560
561
- Use connection pooling for applications with multiple concurrent database operations
562
- Set appropriate `minsize` and `maxsize` for your application's concurrency needs
563
- Use `fetchmany()` for large result sets to control memory usage
564
- Properly close connections and pools to avoid resource leaks
565
- Consider `pool_recycle` parameter for long-running applications
566
- Use async context managers for automatic resource cleanup