or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connections.mdcursors.mdindex.mdpooling.mdsqlalchemy.md

sqlalchemy.mddocs/

0

# SQLAlchemy Integration

1

2

High-level database operations using SQLAlchemy's expression language and ORM capabilities with async/await support. The `aiomysql.sa` subpackage provides SQLAlchemy-compatible interfaces for advanced database operations.

3

4

## Core Imports

5

6

```python

7

import aiomysql.sa

8

from aiomysql.sa import create_engine, SAConnection, Engine

9

```

10

11

## Capabilities

12

13

### Engine Creation

14

15

Create a SQLAlchemy-compatible async engine with connection pooling.

16

17

```python { .api }

18

async def create_engine(

19

minsize: int = 1,

20

maxsize: int = 10,

21

loop = None,

22

dialect = None,

23

pool_recycle: int = -1,

24

compiled_cache = None,

25

**kwargs

26

) -> Engine:

27

"""

28

Create SQLAlchemy-compatible async engine.

29

30

Parameters:

31

- minsize: Minimum number of connections in pool

32

- maxsize: Maximum number of connections in pool

33

- loop: Event loop to use

34

- dialect: SQLAlchemy dialect instance

35

- pool_recycle: Seconds after which to recreate connections

36

- compiled_cache: Compiled query cache

37

- **kwargs: Connection parameters (same as connect())

38

39

Returns:

40

Engine context manager

41

"""

42

```

43

44

### Engine Management

45

46

The Engine class manages connection pools and provides SQLAlchemy-style database access.

47

48

```python { .api }

49

class Engine:

50

@property

51

def dialect(self):

52

"""SQLAlchemy dialect instance."""

53

54

@property

55

def name(self) -> str:

56

"""Engine name."""

57

58

@property

59

def driver(self) -> str:

60

"""Database driver name."""

61

62

@property

63

def minsize(self) -> int:

64

"""Minimum pool size."""

65

66

@property

67

def maxsize(self) -> int:

68

"""Maximum pool size."""

69

70

@property

71

def size(self) -> int:

72

"""Current pool size."""

73

74

@property

75

def freesize(self) -> int:

76

"""Number of free connections."""

77

78

def acquire(self) -> SAConnection:

79

"""

80

Acquire a connection from the engine pool.

81

82

Returns:

83

SAConnection context manager

84

"""

85

86

def release(self, conn: SAConnection) -> None:

87

"""

88

Return a connection to the pool.

89

90

Parameters:

91

- conn: Connection to release

92

"""

93

94

def close(self) -> None:

95

"""Close the engine and mark connections for closure."""

96

97

def terminate(self) -> None:

98

"""Terminate engine immediately, closing all connections."""

99

100

async def wait_closed(self) -> None:

101

"""Wait for engine to be completely closed."""

102

```

103

104

### SQLAlchemy Connection

105

106

SQLAlchemy-style connection wrapper providing high-level database operations.

107

108

```python { .api }

109

class SAConnection:

110

@property

111

def closed(self) -> bool:

112

"""Whether the connection is closed."""

113

114

@property

115

def connection(self) -> Connection:

116

"""Underlying aiomysql connection."""

117

118

@property

119

def in_transaction(self) -> bool:

120

"""Whether connection is in a transaction."""

121

122

def execute(self, query, *multiparams, **params) -> ResultProxy:

123

"""

124

Execute a SQLAlchemy query.

125

126

Parameters:

127

- query: SQLAlchemy selectable or text query

128

- multiparams: Multiple parameter sets for executemany-style

129

- params: Single parameter set

130

131

Returns:

132

ResultProxy for accessing results

133

"""

134

135

async def scalar(self, query, *multiparams, **params):

136

"""

137

Execute query and return scalar result.

138

139

Parameters:

140

- query: SQLAlchemy selectable or text query

141

- multiparams: Multiple parameter sets

142

- params: Single parameter set

143

144

Returns:

145

Single scalar value

146

"""

147

148

def begin(self) -> RootTransaction:

149

"""

150

Begin a transaction.

151

152

Returns:

153

Transaction context manager

154

"""

155

156

async def begin_nested(self) -> NestedTransaction:

157

"""

158

Begin a nested transaction (savepoint).

159

160

Returns:

161

Nested transaction instance

162

"""

163

164

async def begin_twophase(self, xid = None) -> TwoPhaseTransaction:

165

"""

166

Begin a two-phase transaction.

167

168

Parameters:

169

- xid: Transaction ID (generated if None)

170

171

Returns:

172

Two-phase transaction instance

173

"""

174

175

async def recover_twophase(self) -> list:

176

"""

177

Get list of prepared transaction IDs.

178

179

Returns:

180

List of transaction IDs ready for commit/rollback

181

"""

182

183

async def rollback_prepared(self, xid, *, is_prepared: bool = True) -> None:

184

"""

185

Rollback a prepared two-phase transaction.

186

187

Parameters:

188

- xid: Transaction ID to rollback

189

- is_prepared: Whether transaction is already prepared

190

"""

191

192

async def commit_prepared(self, xid, *, is_prepared: bool = True) -> None:

193

"""

194

Commit a prepared two-phase transaction.

195

196

Parameters:

197

- xid: Transaction ID to commit

198

- is_prepared: Whether transaction is already prepared

199

"""

200

201

async def close(self) -> None:

202

"""Close the connection."""

203

```

204

205

### Transaction Classes

206

207

Transaction management classes for various transaction types.

208

209

```python { .api }

210

class Transaction:

211

@property

212

def is_active(self) -> bool:

213

"""Whether transaction is active."""

214

215

@property

216

def connection(self) -> SAConnection:

217

"""Connection associated with transaction."""

218

219

async def commit(self) -> None:

220

"""Commit the transaction."""

221

222

async def rollback(self) -> None:

223

"""Rollback the transaction."""

224

225

async def close(self) -> None:

226

"""Close the transaction."""

227

228

class RootTransaction(Transaction):

229

"""Root-level database transaction."""

230

231

class NestedTransaction(Transaction):

232

"""Nested transaction using savepoints."""

233

234

class TwoPhaseTransaction(Transaction):

235

"""Two-phase (XA) transaction."""

236

237

@property

238

def xid(self) -> str:

239

"""Transaction ID."""

240

241

async def prepare(self) -> None:

242

"""Prepare transaction for commit."""

243

```

244

245

### Result Classes

246

247

Classes for handling query results in SQLAlchemy style.

248

249

```python { .api }

250

class ResultProxy:

251

@property

252

def dialect(self):

253

"""SQLAlchemy dialect."""

254

255

@property

256

def cursor(self) -> Cursor:

257

"""Underlying cursor."""

258

259

@property

260

def rowcount(self) -> int:

261

"""Number of affected rows."""

262

263

@property

264

def lastrowid(self) -> int:

265

"""Last inserted row ID."""

266

267

@property

268

def returns_rows(self) -> bool:

269

"""Whether query returns rows."""

270

271

@property

272

def closed(self) -> bool:

273

"""Whether result is closed."""

274

275

async def fetchone(self) -> RowProxy:

276

"""

277

Fetch next row.

278

279

Returns:

280

RowProxy instance or None

281

"""

282

283

async def fetchall(self) -> list:

284

"""

285

Fetch all remaining rows.

286

287

Returns:

288

List of RowProxy instances

289

"""

290

291

async def fetchmany(self, size: int = None) -> list:

292

"""

293

Fetch multiple rows.

294

295

Parameters:

296

- size: Number of rows to fetch

297

298

Returns:

299

List of RowProxy instances

300

"""

301

302

async def first(self) -> RowProxy:

303

"""

304

Fetch first row and close result.

305

306

Returns:

307

First RowProxy or None

308

"""

309

310

async def scalar(self):

311

"""

312

Fetch scalar value from first row.

313

314

Returns:

315

Single value from first column of first row

316

"""

317

318

def keys(self) -> list:

319

"""

320

Get column names.

321

322

Returns:

323

List of column names

324

"""

325

326

async def close(self) -> None:

327

"""Close the result."""

328

329

class RowProxy:

330

"""Single result row with dict-like and sequence-like access."""

331

332

def as_tuple(self) -> tuple:

333

"""

334

Convert row to tuple.

335

336

Returns:

337

Row data as tuple

338

"""

339

340

def __getitem__(self, key):

341

"""

342

Access column by name or index.

343

344

Parameters:

345

- key: Column name (str) or index (int)

346

347

Returns:

348

Column value

349

"""

350

351

def __getattr__(self, name):

352

"""

353

Access column as attribute.

354

355

Parameters:

356

- name: Column name

357

358

Returns:

359

Column value

360

"""

361

362

def __contains__(self, key) -> bool:

363

"""

364

Check if column exists.

365

366

Parameters:

367

- key: Column name

368

369

Returns:

370

True if column exists

371

"""

372

```

373

374

### Exception Classes

375

376

SQLAlchemy integration specific exceptions.

377

378

```python { .api }

379

class Error(Exception):

380

"""Base SQLAlchemy integration error."""

381

382

class ArgumentError(Error):

383

"""Invalid or conflicting function arguments."""

384

385

class InvalidRequestError(Error):

386

"""Invalid operations or runtime state errors."""

387

388

class NoSuchColumnError(Error):

389

"""Nonexistent column requested from RowProxy."""

390

391

class ResourceClosedError(Error):

392

"""Operation on closed connection/cursor/result."""

393

```

394

395

## Usage Examples

396

397

### Basic SQLAlchemy Usage

398

399

```python

400

import asyncio

401

import sqlalchemy as sa

402

import aiomysql.sa

403

404

async def sqlalchemy_basic_example():

405

# Create engine

406

engine = await aiomysql.sa.create_engine(

407

host='localhost',

408

port=3306,

409

user='myuser',

410

password='mypass',

411

db='mydatabase',

412

minsize=1,

413

maxsize=5

414

)

415

416

# Define table metadata

417

metadata = sa.MetaData()

418

users = sa.Table('users', metadata,

419

sa.Column('id', sa.Integer, primary_key=True),

420

sa.Column('name', sa.String(50)),

421

sa.Column('email', sa.String(100)))

422

423

# Execute queries

424

async with engine.acquire() as conn:

425

# Select query

426

query = sa.select([users]).where(users.c.id < 10)

427

result = await conn.execute(query)

428

429

async for row in result:

430

print(f"User: {row.name} ({row.email})")

431

432

await result.close()

433

434

# Cleanup

435

engine.close()

436

await engine.wait_closed()

437

438

asyncio.run(sqlalchemy_basic_example())

439

```

440

441

### Transaction Management

442

443

```python

444

async def transaction_example():

445

engine = await aiomysql.sa.create_engine(

446

host='localhost',

447

user='myuser',

448

password='mypass',

449

db='mydatabase'

450

)

451

452

metadata = sa.MetaData()

453

users = sa.Table('users', metadata,

454

sa.Column('id', sa.Integer, primary_key=True),

455

sa.Column('name', sa.String(50)),

456

sa.Column('balance', sa.Numeric(10, 2)))

457

458

async with engine.acquire() as conn:

459

# Begin transaction

460

async with conn.begin() as trans:

461

try:

462

# Transfer money between users

463

await conn.execute(

464

users.update().where(users.c.id == 1).values(

465

balance=users.c.balance - 100

466

)

467

)

468

await conn.execute(

469

users.update().where(users.c.id == 2).values(

470

balance=users.c.balance + 100

471

)

472

)

473

474

# Transaction commits automatically on success

475

print("Transfer completed successfully")

476

477

except Exception as e:

478

# Transaction rolls back automatically on error

479

print(f"Transfer failed: {e}")

480

raise

481

482

engine.close()

483

await engine.wait_closed()

484

```

485

486

### Nested Transactions (Savepoints)

487

488

```python

489

async def nested_transaction_example():

490

engine = await aiomysql.sa.create_engine(

491

host='localhost',

492

user='myuser',

493

password='mypass',

494

db='mydatabase'

495

)

496

497

metadata = sa.MetaData()

498

orders = sa.Table('orders', metadata,

499

sa.Column('id', sa.Integer, primary_key=True),

500

sa.Column('user_id', sa.Integer),

501

sa.Column('total', sa.Numeric(10, 2)))

502

503

async with engine.acquire() as conn:

504

async with conn.begin() as trans:

505

# Main transaction: create order

506

result = await conn.execute(

507

orders.insert().values(user_id=123, total=250.00)

508

)

509

order_id = result.lastrowid

510

511

# Nested transaction: try to apply discount

512

try:

513

async with conn.begin_nested() as nested_trans:

514

# Try to apply discount

515

await conn.execute(

516

orders.update().where(orders.c.id == order_id).values(

517

total=orders.c.total * 0.9 # 10% discount

518

)

519

)

520

521

# Simulate discount validation failure

522

if order_id % 2 == 0: # Even order IDs fail

523

raise ValueError("Discount not applicable")

524

525

print("Discount applied successfully")

526

527

except ValueError:

528

# Nested transaction rolls back, main continues

529

print("Discount failed, order created without discount")

530

531

print(f"Order {order_id} completed")

532

533

engine.close()

534

await engine.wait_closed()

535

```

536

537

### Complex Queries with Joins

538

539

```python

540

async def complex_query_example():

541

engine = await aiomysql.sa.create_engine(

542

host='localhost',

543

user='myuser',

544

password='mypass',

545

db='mydatabase'

546

)

547

548

metadata = sa.MetaData()

549

users = sa.Table('users', metadata,

550

sa.Column('id', sa.Integer, primary_key=True),

551

sa.Column('name', sa.String(50)),

552

sa.Column('department_id', sa.Integer))

553

554

departments = sa.Table('departments', metadata,

555

sa.Column('id', sa.Integer, primary_key=True),

556

sa.Column('name', sa.String(50)))

557

558

async with engine.acquire() as conn:

559

# Complex join query

560

query = sa.select([

561

users.c.name.label('user_name'),

562

departments.c.name.label('dept_name')

563

]).select_from(

564

users.join(departments, users.c.department_id == departments.c.id)

565

).where(

566

departments.c.name.in_(['Engineering', 'Marketing'])

567

).order_by(users.c.name)

568

569

result = await conn.execute(query)

570

571

print("Users in Engineering and Marketing:")

572

rows = await result.fetchall()

573

for row in rows:

574

print(f"{row.user_name} - {row.dept_name}")

575

576

await result.close()

577

578

engine.close()

579

await engine.wait_closed()

580

```

581

582

### Raw SQL with Parameters

583

584

```python

585

async def raw_sql_example():

586

engine = await aiomysql.sa.create_engine(

587

host='localhost',

588

user='myuser',

589

password='mypass',

590

db='mydatabase'

591

)

592

593

async with engine.acquire() as conn:

594

# Raw SQL with parameters

595

query = sa.text("""

596

SELECT u.name, COUNT(o.id) as order_count

597

FROM users u

598

LEFT JOIN orders o ON u.id = o.user_id

599

WHERE u.created_at >= :start_date

600

GROUP BY u.id, u.name

601

HAVING COUNT(o.id) >= :min_orders

602

ORDER BY order_count DESC

603

""")

604

605

result = await conn.execute(query, {

606

'start_date': '2023-01-01',

607

'min_orders': 5

608

})

609

610

print("Top customers by order count:")

611

async for row in result:

612

print(f"{row.name}: {row.order_count} orders")

613

614

await result.close()

615

616

engine.close()

617

await engine.wait_closed()

618

```