or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdauth.mdconnection.mdconstants.mdcursors.mderrors.mdindex.mdpooling.mdtypes.mdutilities.md

async.mddocs/

0

# Asynchronous Operations

1

2

Perform database operations asynchronously using asyncio with full async/await support, providing non-blocking database access for high-concurrency applications.

3

4

## Async Connection Management

5

6

### Async Connection Functions

7

8

```python { .api }

9

import mysql.connector.aio

10

11

async def connect(**kwargs) -> 'MySQLConnection':

12

"""

13

Create async connection to MySQL server.

14

15

Returns:

16

MySQLConnection instance for async operations

17

"""

18

pass

19

```

20

21

### MySQLConnection (Async)

22

23

```python { .api }

24

class MySQLConnection:

25

"""

26

Async connection class with asyncio support.

27

Provides non-blocking database operations using async/await.

28

"""

29

30

def __init__(self, **kwargs) -> None:

31

"""Initialize async connection with configuration."""

32

pass

33

34

async def connect(self) -> None:

35

"""Establish async connection to MySQL server."""

36

pass

37

38

async def disconnect(self) -> None:

39

"""Close async connection to MySQL server."""

40

pass

41

42

async def close(self) -> None:

43

"""Close connection (alias for disconnect)."""

44

pass

45

46

def is_connected(self) -> bool:

47

"""Check if connection is active (non-blocking check)."""

48

pass

49

50

async def ping(self, reconnect: bool = False, attempts: int = 1, delay: int = 0) -> None:

51

"""Test connection to server asynchronously."""

52

pass

53

54

async def reconnect(self, attempts: int = 1, delay: int = 0) -> None:

55

"""Reconnect to MySQL server asynchronously."""

56

pass

57

58

def cursor(self,

59

buffered: Optional[bool] = None,

60

raw: Optional[bool] = None,

61

prepared: Optional[bool] = None,

62

cursor_class: Optional[Type] = None,

63

dictionary: Optional[bool] = None) -> 'MySQLCursor':

64

"""Create async cursor for executing SQL statements."""

65

pass

66

67

async def commit(self) -> None:

68

"""Commit current transaction asynchronously."""

69

pass

70

71

async def rollback(self) -> None:

72

"""Rollback current transaction asynchronously."""

73

pass

74

75

async def start_transaction(self,

76

consistent_snapshot: bool = False,

77

isolation_level: Optional[str] = None,

78

readonly: Optional[bool] = None) -> None:

79

"""Start new transaction asynchronously."""

80

pass

81

82

@property

83

def autocommit(self) -> bool:

84

"""Get autocommit mode status."""

85

pass

86

87

async def set_autocommit(self, value: bool) -> None:

88

"""Set autocommit mode asynchronously."""

89

pass

90

91

@property

92

def database(self) -> str:

93

"""Get current database name."""

94

pass

95

96

async def set_database(self, value: str) -> None:

97

"""Change current database asynchronously."""

98

pass

99

100

@property

101

def server_version(self) -> Tuple[int, int, int]:

102

"""Get MySQL server version tuple."""

103

pass

104

105

@property

106

def connection_id(self) -> int:

107

"""Get MySQL connection ID."""

108

pass

109

110

@property

111

def charset(self) -> str:

112

"""Get connection character set."""

113

pass

114

115

async def set_charset(self, value: str) -> None:

116

"""Set connection character set asynchronously."""

117

pass

118

119

async def cmd_query(self, query: Union[str, bytes]) -> Dict:

120

"""Execute query asynchronously and return raw result."""

121

pass

122

123

async def cmd_quit(self) -> bytes:

124

"""Send quit command to server asynchronously."""

125

pass

126

127

async def cmd_init_db(self, database: str) -> bytes:

128

"""Send init_db command to change database asynchronously."""

129

pass

130

131

async def cmd_refresh(self, options: int) -> bytes:

132

"""Send refresh command asynchronously."""

133

pass

134

135

async def cmd_statistics(self) -> Dict:

136

"""Get server statistics asynchronously."""

137

pass

138

139

async def cmd_ping(self) -> bytes:

140

"""Send ping command to server asynchronously."""

141

pass

142

143

async def reset_session(self,

144

user_variables: Optional[Dict] = None,

145

session_variables: Optional[Dict] = None) -> None:

146

"""Reset session to initial state asynchronously."""

147

pass

148

149

async def get_warnings(self, count: Optional[int] = None) -> List[Tuple]:

150

"""Get warning messages from last statement asynchronously."""

151

pass

152

153

@property

154

def warning_count(self) -> int:

155

"""Get warning count from last statement."""

156

pass

157

158

@property

159

def info_msg(self) -> Optional[str]:

160

"""Get info message from last statement."""

161

pass

162

163

@property

164

def insert_id(self) -> int:

165

"""Get auto-generated ID from last INSERT."""

166

pass

167

168

@property

169

def affected_rows(self) -> int:

170

"""Get affected row count from last statement."""

171

pass

172

173

@property

174

def in_transaction(self) -> bool:

175

"""Check if connection is in transaction."""

176

pass

177

178

async def __aenter__(self) -> 'MySQLConnection':

179

"""Async context manager entry."""

180

pass

181

182

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

183

"""Async context manager exit with automatic cleanup."""

184

pass

185

```

186

187

### MySQLConnectionAbstract

188

189

```python { .api }

190

class MySQLConnectionAbstract:

191

"""

192

Abstract base class for async connections.

193

Defines interface for async connection implementations.

194

"""

195

pass

196

```

197

198

## Async Cursor Operations

199

200

### MySQLCursor (Async)

201

202

```python { .api }

203

class MySQLCursor:

204

"""

205

Async cursor for executing SQL statements.

206

Provides non-blocking query execution and result fetching.

207

"""

208

209

async def execute(self, operation: str, params: Optional[Union[Sequence, Dict]] = None, multi: bool = False) -> Optional[AsyncIterator]:

210

"""Execute SQL statement asynchronously with optional parameters."""

211

pass

212

213

async def executemany(self, operation: str, seq_params: Sequence[Union[Sequence, Dict]]) -> None:

214

"""Execute SQL statement multiple times asynchronously."""

215

pass

216

217

async def fetchone(self) -> Optional[Tuple]:

218

"""Fetch next row from result set asynchronously."""

219

pass

220

221

async def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:

222

"""Fetch specified number of rows asynchronously."""

223

pass

224

225

async def fetchall(self) -> List[Tuple]:

226

"""Fetch all remaining rows asynchronously."""

227

pass

228

229

async def close(self) -> None:

230

"""Close cursor and free resources asynchronously."""

231

pass

232

233

async def callproc(self, procname: str, args: Sequence = ()) -> Optional[Dict]:

234

"""Call stored procedure asynchronously."""

235

pass

236

237

def stored_results(self) -> AsyncIterator['MySQLCursor']:

238

"""Return async iterator for stored procedure result sets."""

239

pass

240

241

async def nextset(self) -> Optional[bool]:

242

"""Skip to next result set asynchronously."""

243

pass

244

245

@property

246

def description(self) -> Optional[List[Tuple]]:

247

"""Column metadata for last executed query."""

248

pass

249

250

@property

251

def rowcount(self) -> int:

252

"""Number of rows affected by last operation."""

253

pass

254

255

@property

256

def lastrowid(self) -> Optional[int]:

257

"""Auto-generated ID from last INSERT operation."""

258

pass

259

260

@property

261

def arraysize(self) -> int:

262

"""Default number of rows fetchmany() should return."""

263

pass

264

265

@arraysize.setter

266

def arraysize(self, value: int) -> None:

267

"""Set default fetchmany() size."""

268

pass

269

270

@property

271

def statement(self) -> Optional[str]:

272

"""Last executed SQL statement."""

273

pass

274

275

@property

276

def with_rows(self) -> bool:

277

"""Whether last operation produced result rows."""

278

pass

279

280

@property

281

def column_names(self) -> Tuple[str, ...]:

282

"""Column names from result set."""

283

pass

284

285

def __aiter__(self) -> 'MySQLCursor':

286

"""Make cursor async iterable over result rows."""

287

pass

288

289

async def __anext__(self) -> Tuple:

290

"""Get next row for async iteration."""

291

pass

292

293

async def __aenter__(self) -> 'MySQLCursor':

294

"""Async context manager entry."""

295

pass

296

297

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

298

"""Async context manager exit with automatic cleanup."""

299

pass

300

```

301

302

## Async Connection Pooling

303

304

### MySQLConnectionPool (Async)

305

306

```python { .api }

307

class MySQLConnectionPool:

308

"""

309

Async connection pool manager.

310

Manages pool of async database connections for efficient reuse.

311

"""

312

313

def __init__(self,

314

pool_name: Optional[str] = None,

315

pool_size: int = 5,

316

pool_reset_session: bool = True,

317

**kwargs) -> None:

318

"""Initialize async connection pool."""

319

pass

320

321

async def get_connection(self) -> 'PooledMySQLConnection':

322

"""Get connection from pool asynchronously."""

323

pass

324

325

async def add_connection(self, cnx: Optional['MySQLConnection'] = None) -> 'PooledMySQLConnection':

326

"""Add connection to pool asynchronously."""

327

pass

328

329

def set_config(self, **kwargs) -> None:

330

"""Update pool configuration."""

331

pass

332

333

async def close(self) -> None:

334

"""Close all connections in pool asynchronously."""

335

pass

336

337

@property

338

def pool_name(self) -> str:

339

"""Pool name identifier."""

340

pass

341

342

@property

343

def pool_size(self) -> int:

344

"""Maximum pool size."""

345

pass

346

```

347

348

### PooledMySQLConnection (Async)

349

350

```python { .api }

351

class PooledMySQLConnection:

352

"""

353

Async pooled connection wrapper.

354

Returns connection to pool on close in async context.

355

"""

356

357

def __init__(self, pool: MySQLConnectionPool, cnx: 'MySQLConnection') -> None:

358

"""Initialize async pooled connection wrapper."""

359

pass

360

361

async def close(self) -> None:

362

"""Return connection to pool asynchronously."""

363

pass

364

365

@property

366

def pool_name(self) -> str:

367

"""Name of the connection pool."""

368

pass

369

370

async def __aenter__(self) -> 'PooledMySQLConnection':

371

"""Async context manager entry."""

372

pass

373

374

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

375

"""Async context manager exit returning connection to pool."""

376

pass

377

```

378

379

### Async Pooling Functions

380

381

```python { .api }

382

async def connect(**kwargs) -> Union[MySQLConnection, PooledMySQLConnection]:

383

"""

384

Create async database connection with optional pooling.

385

386

When pool parameters are provided, returns PooledMySQLConnection.

387

Otherwise returns MySQLConnection.

388

"""

389

pass

390

```

391

392

## Usage Examples

393

394

### Basic Async Connection

395

396

```python

397

import asyncio

398

import mysql.connector.aio

399

400

async def main():

401

# Create async connection

402

connection = await mysql.connector.aio.connect(

403

host='localhost',

404

user='myuser',

405

password='mypassword',

406

database='mydatabase'

407

)

408

409

# Create async cursor

410

cursor = connection.cursor()

411

412

# Execute query asynchronously

413

await cursor.execute("SELECT id, name FROM users WHERE age > %s", (25,))

414

415

# Fetch results asynchronously

416

async for (user_id, name) in cursor:

417

print(f"User {user_id}: {name}")

418

419

# Cleanup

420

await cursor.close()

421

await connection.close()

422

423

# Run async function

424

asyncio.run(main())

425

```

426

427

### Async Context Managers

428

429

```python

430

import asyncio

431

import mysql.connector.aio

432

433

async def main():

434

# Automatic async connection cleanup

435

async with mysql.connector.aio.connect(

436

host='localhost',

437

user='myuser',

438

password='mypassword',

439

database='mydatabase'

440

) as connection:

441

442

# Automatic async cursor cleanup

443

async with connection.cursor(dictionary=True) as cursor:

444

await cursor.execute("SELECT COUNT(*) as total FROM users")

445

result = await cursor.fetchone()

446

print(f"Total users: {result['total']}")

447

# Cursor automatically closed

448

# Connection automatically closed

449

450

asyncio.run(main())

451

```

452

453

### Async Transaction Management

454

455

```python

456

import asyncio

457

import mysql.connector.aio

458

459

async def transfer_funds(from_account: int, to_account: int, amount: float):

460

async with mysql.connector.aio.connect(

461

host='localhost',

462

user='myuser',

463

password='mypassword',

464

database='mydatabase'

465

) as connection:

466

467

try:

468

# Start transaction

469

await connection.start_transaction()

470

471

async with connection.cursor() as cursor:

472

# Debit from account

473

await cursor.execute(

474

"UPDATE accounts SET balance = balance - %s WHERE id = %s",

475

(amount, from_account)

476

)

477

478

# Credit to account

479

await cursor.execute(

480

"UPDATE accounts SET balance = balance + %s WHERE id = %s",

481

(amount, to_account)

482

)

483

484

# Check if both operations affected rows

485

if cursor.rowcount == 0:

486

raise ValueError("Account not found")

487

488

# Commit transaction

489

await connection.commit()

490

print(f"Transferred {amount} from {from_account} to {to_account}")

491

492

except Exception as err:

493

# Rollback on error

494

await connection.rollback()

495

print(f"Transfer failed: {err}")

496

raise

497

498

asyncio.run(transfer_funds(1, 2, 100.0))

499

```

500

501

### Async Connection Pooling

502

503

```python

504

import asyncio

505

import mysql.connector.aio

506

507

async def worker_task(worker_id: int):

508

"""Async worker using pooled connection."""

509

# Get connection from async pool

510

async with mysql.connector.aio.connect(

511

host='localhost',

512

user='myuser',

513

password='mypassword',

514

database='mydatabase',

515

pool_name='async_pool',

516

pool_size=5

517

) as connection:

518

519

async with connection.cursor() as cursor:

520

await cursor.execute("SELECT SLEEP(%s)", (1,))

521

await cursor.fetchone()

522

523

print(f"Async worker {worker_id} completed")

524

525

async def main():

526

# Create multiple async tasks

527

tasks = [worker_task(i) for i in range(10)]

528

529

# Run tasks concurrently

530

await asyncio.gather(*tasks)

531

print("All async workers completed")

532

533

asyncio.run(main())

534

```

535

536

### Async Batch Processing

537

538

```python

539

import asyncio

540

import mysql.connector.aio

541

542

async def process_batch(batch_data: List[Dict]):

543

"""Process batch of data asynchronously."""

544

async with mysql.connector.aio.connect(

545

host='localhost',

546

user='myuser',

547

password='mypassword',

548

database='mydatabase'

549

) as connection:

550

551

async with connection.cursor() as cursor:

552

# Prepare batch insert

553

insert_query = "INSERT INTO processed_data (id, value, timestamp) VALUES (%s, %s, NOW())"

554

555

# Execute batch asynchronously

556

for item in batch_data:

557

await cursor.execute(insert_query, (item['id'], item['value']))

558

559

await connection.commit()

560

print(f"Processed batch of {len(batch_data)} items")

561

562

async def main():

563

# Sample data batches

564

batches = [

565

[{'id': 1, 'value': 'A'}, {'id': 2, 'value': 'B'}],

566

[{'id': 3, 'value': 'C'}, {'id': 4, 'value': 'D'}],

567

[{'id': 5, 'value': 'E'}, {'id': 6, 'value': 'F'}]

568

]

569

570

# Process all batches concurrently

571

tasks = [process_batch(batch) for batch in batches]

572

await asyncio.gather(*tasks)

573

print("All batches processed")

574

575

asyncio.run(main())

576

```

577

578

### Async Result Streaming

579

580

```python

581

import asyncio

582

import mysql.connector.aio

583

584

async def stream_large_dataset():

585

"""Stream large dataset asynchronously."""

586

async with mysql.connector.aio.connect(

587

host='localhost',

588

user='myuser',

589

password='mypassword',

590

database='mydatabase'

591

) as connection:

592

593

async with connection.cursor() as cursor:

594

await cursor.execute("SELECT * FROM large_table ORDER BY id")

595

596

# Stream results asynchronously

597

count = 0

598

async for row in cursor:

599

# Process each row as it arrives

600

print(f"Processing row {count}: {row[0]}")

601

count += 1

602

603

# Yield control to other tasks periodically

604

if count % 1000 == 0:

605

await asyncio.sleep(0) # Yield control

606

607

print(f"Streamed {count} rows")

608

609

asyncio.run(stream_large_dataset())

610

```

611

612

### Async with Multiple Databases

613

614

```python

615

import asyncio

616

import mysql.connector.aio

617

618

async def sync_data_between_databases():

619

"""Sync data between two databases asynchronously."""

620

621

# Connect to source database

622

source_conn = await mysql.connector.aio.connect(

623

host='source.mysql.example.com',

624

user='myuser',

625

password='mypassword',

626

database='source_db'

627

)

628

629

# Connect to destination database

630

dest_conn = await mysql.connector.aio.connect(

631

host='dest.mysql.example.com',

632

user='myuser',

633

password='mypassword',

634

database='dest_db'

635

)

636

637

try:

638

# Get cursors for both connections

639

source_cursor = source_conn.cursor(dictionary=True)

640

dest_cursor = dest_conn.cursor()

641

642

# Read from source

643

await source_cursor.execute("SELECT * FROM users WHERE updated_at > %s", ('2024-01-01',))

644

645

# Process and insert to destination

646

async for user in source_cursor:

647

await dest_cursor.execute(

648

"INSERT INTO users (id, name, email) VALUES (%s, %s, %s) ON DUPLICATE KEY UPDATE name=%s, email=%s",

649

(user['id'], user['name'], user['email'], user['name'], user['email'])

650

)

651

652

# Commit destination changes

653

await dest_conn.commit()

654

print("Data sync completed")

655

656

finally:

657

# Cleanup both connections

658

await source_cursor.close()

659

await dest_cursor.close()

660

await source_conn.close()

661

await dest_conn.close()

662

663

asyncio.run(sync_data_between_databases())

664

```

665

666

### Async with Timeout and Error Handling

667

668

```python

669

import asyncio

670

import mysql.connector.aio

671

672

async def query_with_timeout():

673

"""Execute query with timeout and error handling."""

674

try:

675

# Set timeout for entire operation

676

async with asyncio.timeout(30): # 30 second timeout

677

async with mysql.connector.aio.connect(

678

host='localhost',

679

user='myuser',

680

password='mypassword',

681

database='mydatabase',

682

connect_timeout=10 # Connection timeout

683

) as connection:

684

685

async with connection.cursor() as cursor:

686

# Long-running query

687

await cursor.execute("SELECT * FROM large_table WHERE complex_condition = %s", ('value',))

688

689

results = await cursor.fetchall()

690

print(f"Query returned {len(results)} rows")

691

692

except asyncio.TimeoutError:

693

print("Query timed out")

694

except mysql.connector.Error as err:

695

print(f"Database error: {err}")

696

except Exception as err:

697

print(f"Unexpected error: {err}")

698

699

asyncio.run(query_with_timeout())

700

```