or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-operations.mdconnections.mdcursors.mddata-types.mderror-handling.mdindex.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Asyncio-compatible database operations including async connections, cursors, and connection pooling for high-performance concurrent database access.

3

4

## Capabilities

5

6

### Async Connection Creation

7

8

Create asynchronous database connections that integrate with Python's asyncio event loop for non-blocking database operations.

9

10

```python { .api }

11

async def connect(host="localhost", user=None, passwd="", db=None, port=3306,

12

unix_socket=None, charset='', sql_mode=None, read_default_file=None,

13

client_flag=0, cursorclass=None, init_command=None, connect_timeout=None,

14

ssl=None, read_default_group=None, compress="", zstd_compression_level=3,

15

named_pipe=None, conv=None, encoders=None, loop=None):

16

"""

17

Create an asynchronous database connection.

18

19

Parameters:

20

- host (str): MySQL server hostname or IP address (default: "localhost")

21

- user (str): Username for authentication

22

- passwd (str): Password for authentication (default: "")

23

- port (int): MySQL server port number (default: 3306)

24

- db (str): Default database name

25

- unix_socket (str): Unix socket path for local connections

26

- charset (str): Character set for connection (default: '')

27

- sql_mode (str): SQL mode setting for connection

28

- read_default_file (str): MySQL configuration file path

29

- client_flag (int): Custom flags to send to MySQL

30

- cursorclass: Default cursor class for this connection

31

- init_command (str): SQL command to run on connection

32

- connect_timeout (int): Connection timeout in seconds

33

- ssl (dict): SSL configuration parameters

34

- read_default_group (str): Group to read from configuration file

35

- compress (str): Compression algorithm ("zlib" or "zstd")

36

- zstd_compression_level (int): ZSTD compression level (1-22, default: 3)

37

- named_pipe: Not supported (raises NotImplementedError)

38

- conv: Decoders dictionary for custom type marshalling

39

- encoders: Encoders dictionary for custom type marshalling

40

- loop: Event loop to use (default: current event loop)

41

42

Returns:

43

AsyncConnection: Asynchronous database connection object

44

45

Raises:

46

OperationalError: Connection failed

47

InterfaceError: Invalid connection parameters

48

"""

49

```

50

51

### Async Connection Management

52

53

The AsyncConnection class provides asynchronous methods for managing database connections and executing operations without blocking the event loop.

54

55

```python { .api }

56

class AsyncConnection:

57

async def cursor(self, cursor=None):

58

"""

59

Create a new async cursor object.

60

61

Parameters:

62

- cursor: Async cursor class to instantiate (optional)

63

64

Returns:

65

AsyncCursor: New async cursor object

66

"""

67

68

async def commit(self):

69

"""

70

Commit current transaction asynchronously.

71

72

Raises:

73

OperationalError: Transaction commit failed

74

"""

75

76

async def rollback(self):

77

"""

78

Roll back current transaction asynchronously.

79

80

Raises:

81

OperationalError: Transaction rollback failed

82

"""

83

84

def close(self):

85

"""

86

Close the database connection.

87

"""

88

89

async def ensure_closed(self):

90

"""

91

Ensure connection is properly closed and cleaned up.

92

"""

93

94

async def autocommit(self, value):

95

"""

96

Enable or disable autocommit mode asynchronously.

97

98

Parameters:

99

- value (bool): True to enable autocommit, False to disable

100

"""

101

102

async def ping(self):

103

"""

104

Check if connection to server is alive asynchronously.

105

106

Raises:

107

OperationalError: Connection check failed

108

"""

109

110

async def set_charset(self, charset):

111

"""

112

Set connection character set asynchronously.

113

114

Parameters:

115

- charset (str): Character set name

116

"""

117

```

118

119

### Async Cursor Operations

120

121

Asynchronous cursor providing non-blocking SQL execution and result retrieval.

122

123

```python { .api }

124

class AsyncCursor:

125

async def execute(self, query, args=None):

126

"""

127

Execute a SQL statement asynchronously.

128

129

Parameters:

130

- query (str): SQL statement with optional %s placeholders

131

- args (tuple/list/dict): Parameters to bind to placeholders

132

133

Returns:

134

int: Number of affected rows

135

136

Raises:

137

ProgrammingError: SQL syntax error

138

OperationalError: Database operation error

139

"""

140

141

async def executemany(self, query, args_list):

142

"""

143

Execute a SQL statement multiple times asynchronously.

144

145

Parameters:

146

- query (str): SQL statement with %s placeholders

147

- args_list (list/tuple): Sequence of parameter tuples/lists

148

149

Returns:

150

int: Number of affected rows from all executions

151

"""

152

153

async def fetchone(self):

154

"""

155

Fetch the next row asynchronously.

156

157

Returns:

158

tuple: Next row data, or None if no more rows

159

"""

160

161

async def fetchmany(self, size=None):

162

"""

163

Fetch multiple rows asynchronously.

164

165

Parameters:

166

- size (int): Number of rows to fetch (default: arraysize)

167

168

Returns:

169

list: List of row tuples

170

"""

171

172

async def fetchall(self):

173

"""

174

Fetch all remaining rows asynchronously.

175

176

Returns:

177

list: List of all remaining row tuples

178

"""

179

180

def close(self):

181

"""

182

Close the cursor and free resources.

183

"""

184

185

async def __aenter__(self):

186

"""Async context manager entry."""

187

188

async def __aexit__(self, exc_type, exc_val, exc_tb):

189

"""Async context manager exit."""

190

```

191

192

### Dictionary Cursor

193

194

Async cursor that returns rows as dictionaries instead of tuples.

195

196

```python { .api }

197

class AsyncDictCursor(AsyncCursor):

198

"""

199

Async cursor that returns rows as dictionaries with column names as keys.

200

201

Each row is returned as a dictionary mapping column names to values,

202

making it easier to access specific columns by name in async operations.

203

"""

204

205

async def execute(self, query, args=None):

206

"""Execute query and prepare field mapping for dictionary results."""

207

208

async def fetchone(self):

209

"""

210

Fetch the next row as a dictionary asynchronously.

211

212

Returns:

213

dict: Row data as {column_name: value} mapping, or None if no more rows

214

"""

215

216

async def fetchmany(self, size=None):

217

"""

218

Fetch multiple rows as dictionaries asynchronously.

219

220

Parameters:

221

- size (int): Number of rows to fetch (default: arraysize)

222

223

Returns:

224

tuple: Tuple of dictionaries representing rows

225

"""

226

227

async def fetchall(self):

228

"""

229

Fetch all remaining rows as dictionaries asynchronously.

230

231

Returns:

232

tuple: Tuple of dictionaries representing all remaining rows

233

"""

234

```

235

236

### Connection Pooling

237

238

Efficient connection pooling for high-throughput async applications with automatic connection lifecycle management.

239

240

```python { .api }

241

def create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):

242

"""

243

Create an async connection pool context manager.

244

245

Parameters:

246

- minsize (int): Minimum number of connections in pool

247

- maxsize (int): Maximum number of connections in pool

248

- pool_recycle (int): Connection recycle time in seconds (-1 = no recycle)

249

- loop: Event loop to use (default: current event loop)

250

- **kwargs: Connection parameters for pool connections

251

252

Returns:

253

PoolContextManager: Context manager for connection pool

254

"""

255

256

async def _create_pool(minsize=1, maxsize=10, pool_recycle=-1, loop=None, **kwargs):

257

"""

258

Create an async connection pool directly.

259

260

Returns:

261

Pool: Connection pool object

262

"""

263

264

class Pool:

265

"""

266

Async connection pool managing multiple database connections.

267

"""

268

269

def acquire(self):

270

"""

271

Get a connection from the pool.

272

273

Returns:

274

PoolAcquireContextManager: Context manager for pool connection

275

"""

276

277

async def _acquire(self):

278

"""

279

Acquire a connection from the pool directly.

280

281

Returns:

282

AsyncConnection: Database connection from pool

283

"""

284

285

def release(self, conn):

286

"""

287

Return a connection to the pool.

288

289

Parameters:

290

- conn: Connection to return to pool

291

"""

292

293

def close(self):

294

"""

295

Close the pool and all connections.

296

"""

297

298

async def wait_closed(self):

299

"""

300

Wait for pool closure to complete.

301

"""

302

303

@property

304

def size(self):

305

"""Current number of connections in pool."""

306

307

@property

308

def freesize(self):

309

"""Number of free connections in pool."""

310

```

311

312

## Usage Examples

313

314

### Basic Async Connection

315

316

```python

317

import asyncio

318

import cymysql.aio

319

320

async def basic_query():

321

conn = await cymysql.aio.connect(

322

host='localhost',

323

user='root',

324

password='password',

325

db='testdb'

326

)

327

328

cursor = await conn.cursor()

329

await cursor.execute("SELECT COUNT(*) FROM users")

330

331

result = await cursor.fetchone()

332

print(f"User count: {result[0]}")

333

334

cursor.close()

335

conn.close()

336

337

asyncio.run(basic_query())

338

```

339

340

### Async Context Managers

341

342

```python

343

import asyncio

344

import cymysql.aio

345

346

async def context_manager_example():

347

conn = await cymysql.aio.connect(

348

host='localhost',

349

user='root',

350

password='password',

351

db='testdb'

352

)

353

354

# Cursor context manager

355

async with conn.cursor() as cursor:

356

await cursor.execute("SELECT id, name FROM users WHERE active = %s", (True,))

357

358

async for row in cursor:

359

print(f"User: {row[1]} (ID: {row[0]})")

360

361

conn.close()

362

363

asyncio.run(context_manager_example())

364

```

365

366

### Connection Pool Usage

367

368

```python

369

import asyncio

370

import cymysql.aio

371

372

async def pool_example():

373

# Create connection pool

374

pool = await cymysql.aio.create_pool(

375

host='localhost',

376

user='root',

377

password='password',

378

db='testdb',

379

minsize=1,

380

maxsize=10

381

)

382

383

# Use pool connection

384

async with pool.acquire() as conn:

385

async with conn.cursor() as cursor:

386

await cursor.execute("SELECT VERSION()")

387

version = await cursor.fetchone()

388

print(f"MySQL version: {version[0]}")

389

390

# Clean up pool

391

pool.close()

392

await pool.wait_closed()

393

394

asyncio.run(pool_example())

395

```

396

397

### Concurrent Database Operations

398

399

```python

400

import asyncio

401

import cymysql.aio

402

403

async def fetch_user_data(pool, user_id):

404

async with pool.acquire() as conn:

405

async with conn.cursor(cymysql.aio.AsyncDictCursor) as cursor:

406

await cursor.execute(

407

"SELECT id, name, email FROM users WHERE id = %s",

408

(user_id,)

409

)

410

return await cursor.fetchone()

411

412

async def concurrent_example():

413

pool = await cymysql.aio.create_pool(

414

host='localhost',

415

user='root',

416

password='password',

417

db='testdb',

418

minsize=5,

419

maxsize=20

420

)

421

422

# Fetch multiple users concurrently

423

user_ids = [1, 2, 3, 4, 5]

424

tasks = [fetch_user_data(pool, uid) for uid in user_ids]

425

426

users = await asyncio.gather(*tasks)

427

428

for user in users:

429

if user:

430

print(f"User: {user['name']} ({user['email']})")

431

432

pool.close()

433

await pool.wait_closed()

434

435

asyncio.run(concurrent_example())

436

```

437

438

### Async Transaction Management

439

440

```python

441

import asyncio

442

import cymysql.aio

443

444

async def transfer_funds(pool, from_account, to_account, amount):

445

async with pool.acquire() as conn:

446

try:

447

# Disable autocommit for transaction

448

await conn.autocommit(False)

449

450

async with conn.cursor() as cursor:

451

# Debit from source account

452

await cursor.execute(

453

"UPDATE accounts SET balance = balance - %s WHERE id = %s",

454

(amount, from_account)

455

)

456

457

# Credit to destination account

458

await cursor.execute(

459

"UPDATE accounts SET balance = balance + %s WHERE id = %s",

460

(amount, to_account)

461

)

462

463

# Commit transaction

464

await conn.commit()

465

print(f"Transfer of ${amount} completed successfully")

466

467

except Exception as e:

468

# Rollback on error

469

await conn.rollback()

470

print(f"Transfer failed: {e}")

471

finally:

472

# Re-enable autocommit

473

await conn.autocommit(True)

474

475

async def transaction_example():

476

pool = await cymysql.aio.create_pool(

477

host='localhost',

478

user='root',

479

password='password',

480

db='banking'

481

)

482

483

await transfer_funds(pool, from_account=1, to_account=2, amount=100.00)

484

485

pool.close()

486

await pool.wait_closed()

487

488

asyncio.run(transaction_example())

489

```

490

491

### Streaming Large Result Sets

492

493

```python

494

import asyncio

495

import cymysql.aio

496

497

async def process_large_dataset(pool):

498

async with pool.acquire() as conn:

499

async with conn.cursor() as cursor:

500

await cursor.execute("SELECT * FROM large_table")

501

502

# Process results in batches to avoid memory issues

503

while True:

504

rows = await cursor.fetchmany(1000)

505

if not rows:

506

break

507

508

# Process batch

509

for row in rows:

510

await process_row_async(row)

511

512

print(f"Processed batch of {len(rows)} rows")

513

514

async def process_row_async(row):

515

# Simulate async processing

516

await asyncio.sleep(0.001)

517

518

asyncio.run(process_large_dataset(pool))

519

```

520

521

### Error Handling in Async Operations

522

523

```python

524

import asyncio

525

import cymysql.aio

526

from cymysql import OperationalError, ProgrammingError

527

528

async def robust_async_operation():

529

pool = None

530

try:

531

pool = await cymysql.aio.create_pool(

532

host='localhost',

533

user='root',

534

password='password',

535

db='testdb',

536

minsize=1,

537

maxsize=5

538

)

539

540

async with pool.acquire() as conn:

541

async with conn.cursor() as cursor:

542

await cursor.execute("SELECT COUNT(*) FROM nonexistent_table")

543

result = await cursor.fetchone()

544

545

except OperationalError as e:

546

print(f"Database operation error: {e}")

547

except ProgrammingError as e:

548

print(f"SQL programming error: {e}")

549

except Exception as e:

550

print(f"Unexpected error: {e}")

551

finally:

552

if pool:

553

pool.close()

554

await pool.wait_closed()

555

556

asyncio.run(robust_async_operation())

557

```

558

559

## Performance Best Practices

560

561

- Use connection pooling for applications with multiple concurrent database operations

562

- Set appropriate `minsize` and `maxsize` for your application's concurrency needs

563

- Use `fetchmany()` for large result sets to control memory usage

564

- Properly close connections and pools to avoid resource leaks

565

- Consider `pool_recycle` parameter for long-running applications

566

- Use async context managers for automatic resource cleanup