or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async.mdcore-engine.mddialects.mdindex.mdorm.mdschema.mdsql-expression.mdtypes.md

async.mddocs/

0

# Async Support

1

2

Asynchronous database operations with async engines, connections, sessions, and ORM support for modern async Python applications. SQLAlchemy's async support enables non-blocking database operations in asyncio applications.

3

4

## Capabilities

5

6

### Async Engine Creation

7

8

Create asynchronous database engines for async/await database operations.

9

10

```python { .api }

11

async def create_async_engine(url, **kwargs):

12

"""

13

Create asynchronous database engine.

14

15

Parameters:

16

- url: str or URL, database connection URL with async driver

17

- echo: bool, log all SQL statements

18

- pool_size: int, connection pool size

19

- max_overflow: int, maximum pool overflow

20

- pool_timeout: int, connection timeout in seconds

21

- pool_recycle: int, connection recycle time

22

23

Returns:

24

AsyncEngine: Asynchronous database engine

25

"""

26

27

class AsyncEngine:

28

"""Asynchronous database engine with connection pooling."""

29

30

async def connect(self):

31

"""

32

Create new async database connection.

33

34

Returns:

35

AsyncConnection: New async database connection

36

"""

37

38

async def execute(self, statement, parameters=None):

39

"""

40

Execute statement with automatic connection management.

41

42

Parameters:

43

- statement: str or executable, SQL statement

44

- parameters: dict or sequence, bound parameters

45

46

Returns:

47

Result: Query results

48

"""

49

50

async def begin(self):

51

"""

52

Begin transaction with automatic connection management.

53

54

Returns:

55

AsyncTransaction: Async transaction context manager

56

"""

57

58

async def dispose(self):

59

"""Close all connections and dispose of connection pool."""

60

61

def sync_engine(self):

62

"""

63

Get synchronous engine for metadata operations.

64

65

Returns:

66

Engine: Synchronous engine for DDL operations

67

"""

68

69

@property

70

def dialect(self):

71

"""Database dialect for this engine."""

72

```

73

74

### Async Connection Management

75

76

Asynchronous connection handling with transaction support.

77

78

```python { .api }

79

class AsyncConnection:

80

"""Asynchronous database connection with transaction support."""

81

82

async def execute(self, statement, parameters=None):

83

"""

84

Execute SQL statement on this async connection.

85

86

Parameters:

87

- statement: str or executable, SQL statement

88

- parameters: dict or sequence, bound parameters

89

90

Returns:

91

Result: Query results

92

"""

93

94

async def begin(self):

95

"""

96

Begin transaction on this connection.

97

98

Returns:

99

AsyncTransaction: Async transaction object

100

"""

101

102

async def commit(self):

103

"""Commit current transaction."""

104

105

async def rollback(self):

106

"""Rollback current transaction."""

107

108

async def close(self):

109

"""Close this async connection."""

110

111

async def scalar(self, statement, parameters=None):

112

"""

113

Execute statement and return scalar result.

114

115

Parameters:

116

- statement: str or executable, SQL statement

117

- parameters: dict or sequence, bound parameters

118

119

Returns:

120

Any: Single scalar value

121

"""

122

123

def get_transaction(self):

124

"""

125

Get current transaction for this connection.

126

127

Returns:

128

AsyncTransaction or None: Current transaction

129

"""

130

```

131

132

### Async Transaction Management

133

134

Asynchronous transaction handling with context manager support.

135

136

```python { .api }

137

class AsyncTransaction:

138

"""Asynchronous database transaction with rollback support."""

139

140

async def commit(self):

141

"""Commit this async transaction."""

142

143

async def rollback(self):

144

"""Rollback this async transaction."""

145

146

async def close(self):

147

"""Close transaction (rollback if not committed)."""

148

149

def is_active(self):

150

"""

151

Check if transaction is active.

152

153

Returns:

154

bool: True if transaction is active

155

"""

156

```

157

158

### Async ORM Session

159

160

Asynchronous ORM session with identity map and unit of work patterns.

161

162

```python { .api }

163

class AsyncSession:

164

"""Asynchronous ORM session with identity map and unit of work."""

165

166

def __init__(self, bind, **kwargs):

167

"""

168

Create async ORM session.

169

170

Parameters:

171

- bind: AsyncEngine for database operations

172

- autoflush: bool, auto-flush before queries (default True)

173

- expire_on_commit: bool, expire objects after commit (default True)

174

"""

175

176

def add(self, instance):

177

"""

178

Add object instance to session.

179

180

Parameters:

181

- instance: mapped object to add

182

"""

183

184

def add_all(self, instances):

185

"""

186

Add multiple object instances to session.

187

188

Parameters:

189

- instances: iterable of mapped objects

190

"""

191

192

async def delete(self, instance):

193

"""

194

Mark object instance for deletion.

195

196

Parameters:

197

- instance: mapped object to delete

198

"""

199

200

async def commit(self):

201

"""Flush pending changes and commit transaction."""

202

203

async def rollback(self):

204

"""Rollback current transaction and expire all objects."""

205

206

async def flush(self):

207

"""Flush pending changes to database without committing."""

208

209

def expunge(self, instance):

210

"""

211

Remove instance from session without deleting.

212

213

Parameters:

214

- instance: mapped object to remove from session

215

"""

216

217

def expunge_all(self):

218

"""Remove all instances from session."""

219

220

async def refresh(self, instance, attribute_names=None):

221

"""

222

Refresh object from database.

223

224

Parameters:

225

- instance: mapped object to refresh

226

- attribute_names: specific attributes to refresh

227

"""

228

229

async def merge(self, instance):

230

"""

231

Merge detached instance into session.

232

233

Parameters:

234

- instance: detached mapped object

235

236

Returns:

237

object: Merged persistent instance

238

"""

239

240

async def execute(self, statement, parameters=None, **kwargs):

241

"""

242

Execute statement with ORM-level processing.

243

244

Parameters:

245

- statement: SQL statement or ORM query

246

- parameters: bind parameters

247

248

Returns:

249

Result: Query results

250

"""

251

252

async def scalar(self, statement, parameters=None, **kwargs):

253

"""

254

Execute statement and return scalar result.

255

256

Parameters:

257

- statement: SQL statement or ORM query

258

- parameters: bind parameters

259

260

Returns:

261

Any: Scalar result value

262

"""

263

264

async def get(self, entity, ident):

265

"""

266

Get object by primary key.

267

268

Parameters:

269

- entity: mapped class

270

- ident: primary key value or tuple

271

272

Returns:

273

object or None: Object instance or None if not found

274

"""

275

276

async def stream(self, statement):

277

"""

278

Execute statement and return async result stream.

279

280

Parameters:

281

- statement: SQL statement or ORM query

282

283

Returns:

284

AsyncResult: Streaming query results

285

"""

286

287

async def close(self):

288

"""Close the async session."""

289

290

def async_sessionmaker(bind=None, **kwargs):

291

"""

292

Create AsyncSession factory.

293

294

Parameters:

295

- bind: AsyncEngine for database operations

296

- kwargs: AsyncSession configuration options

297

298

Returns:

299

async_sessionmaker: AsyncSession factory class

300

"""

301

302

def async_scoped_session(session_factory):

303

"""

304

Create async scoped session with context-local storage.

305

306

Parameters:

307

- session_factory: async_sessionmaker instance

308

309

Returns:

310

async_scoped_session: Context-local async session proxy

311

"""

312

```

313

314

### Async Result Processing

315

316

Asynchronous result iteration and processing.

317

318

```python { .api }

319

class AsyncResult:

320

"""Asynchronous query result with async iteration."""

321

322

async def fetchone(self):

323

"""

324

Fetch next row asynchronously.

325

326

Returns:

327

Row or None: Next row or None if no more rows

328

"""

329

330

async def fetchmany(self, size=None):

331

"""

332

Fetch multiple rows asynchronously.

333

334

Parameters:

335

- size: int, number of rows to fetch

336

337

Returns:

338

List[Row]: List of rows

339

"""

340

341

async def fetchall(self):

342

"""

343

Fetch all remaining rows asynchronously.

344

345

Returns:

346

List[Row]: All remaining rows

347

"""

348

349

async def scalar(self):

350

"""

351

Fetch scalar value from first column of first row.

352

353

Returns:

354

Any: Scalar value or None

355

"""

356

357

def mappings(self):

358

"""

359

Return result as async mapping-like objects.

360

361

Returns:

362

AsyncMappingResult: Result with dict-like row access

363

"""

364

365

def scalars(self, index=0):

366

"""

367

Return result as async scalar values.

368

369

Parameters:

370

- index: int, column index for scalar extraction

371

372

Returns:

373

AsyncScalarResult: Result with scalar value iteration

374

"""

375

376

def partitions(self, size=None):

377

"""

378

Partition result into chunks for async processing.

379

380

Parameters:

381

- size: int, partition size

382

383

Returns:

384

AsyncIterator: Async iterator of row partitions

385

"""

386

387

async def __aiter__(self):

388

"""Async iterator support for result rows."""

389

390

async def __anext__(self):

391

"""Get next row in async iteration."""

392

393

class AsyncScalarResult:

394

"""Async result optimized for scalar value iteration."""

395

396

async def all(self):

397

"""

398

Fetch all scalar values.

399

400

Returns:

401

List[Any]: All scalar values

402

"""

403

404

async def first(self):

405

"""

406

Fetch first scalar value.

407

408

Returns:

409

Any or None: First scalar value or None

410

"""

411

412

async def one(self):

413

"""

414

Fetch exactly one scalar value.

415

416

Returns:

417

Any: Single scalar value

418

419

Raises:

420

NoResultFound: If no results

421

MultipleResultsFound: If multiple results

422

"""

423

424

async def one_or_none(self):

425

"""

426

Fetch one scalar value or None.

427

428

Returns:

429

Any or None: Single scalar value or None

430

431

Raises:

432

MultipleResultsFound: If multiple results

433

"""

434

```

435

436

### Async Utilities and Helpers

437

438

Utility functions for async SQLAlchemy operations.

439

440

```python { .api }

441

async def run_sync(fn, *args, **kwargs):

442

"""

443

Run synchronous function in async context.

444

445

Parameters:

446

- fn: synchronous function to run

447

- args: positional arguments

448

- kwargs: keyword arguments

449

450

Returns:

451

Any: Function result

452

"""

453

454

def greenlet_spawn(fn, *args, **kwargs):

455

"""

456

Spawn greenlet for sync operations in async context.

457

458

Parameters:

459

- fn: function to run in greenlet

460

- args: positional arguments

461

- kwargs: keyword arguments

462

463

Returns:

464

Any: Function result

465

"""

466

```

467

468

## Usage Examples

469

470

### Basic Async Engine Usage

471

472

```python

473

import asyncio

474

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

475

from sqlalchemy.orm import DeclarativeBase, mapped_column, Mapped

476

from sqlalchemy import String, Integer, select

477

478

# Define models

479

class Base(DeclarativeBase):

480

pass

481

482

class User(Base):

483

__tablename__ = 'users'

484

485

id: Mapped[int] = mapped_column(primary_key=True)

486

name: Mapped[str] = mapped_column(String(50))

487

email: Mapped[str] = mapped_column(String(100))

488

489

async def main():

490

# Create async engine (note: driver must support async)

491

engine = create_async_engine(

492

"postgresql+asyncpg://user:pass@localhost/dbname",

493

echo=True

494

)

495

496

# Create tables (using sync engine)

497

async with engine.begin() as conn:

498

await conn.run_sync(Base.metadata.create_all)

499

500

# Use async session

501

async with AsyncSession(engine) as session:

502

# Add new user

503

new_user = User(name="Alice", email="alice@example.com")

504

session.add(new_user)

505

await session.commit()

506

507

# Query users

508

stmt = select(User).where(User.name.like('%Alice%'))

509

result = await session.execute(stmt)

510

users = result.scalars().all()

511

512

for user in users:

513

print(f"User: {user.name}, Email: {user.email}")

514

515

await engine.dispose()

516

517

# Run async function

518

asyncio.run(main())

519

```

520

521

### Async Connection Context Management

522

523

```python

524

async def database_operations():

525

engine = create_async_engine("sqlite+aiosqlite:///async_example.db")

526

527

# Direct connection usage

528

async with engine.connect() as conn:

529

# Execute raw SQL

530

result = await conn.execute(text("SELECT 1"))

531

value = result.scalar()

532

print(f"Result: {value}")

533

534

# Transaction management

535

async with conn.begin():

536

await conn.execute(

537

users.insert().values(name="Bob", email="bob@example.com")

538

)

539

# Automatically committed

540

541

await engine.dispose()

542

```

543

544

### Streaming Results

545

546

```python

547

async def stream_large_dataset():

548

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

549

550

async with AsyncSession(engine) as session:

551

# Stream results for memory efficiency

552

stmt = select(User).where(User.active == True)

553

stream = await session.stream(stmt)

554

555

async for user in stream.scalars():

556

print(f"Processing user: {user.name}")

557

# Process user without loading all into memory

558

559

await engine.dispose()

560

```

561

562

### Async Session Factory

563

564

```python

565

from sqlalchemy.ext.asyncio import async_sessionmaker

566

567

# Create reusable session factory

568

async_session = async_sessionmaker(

569

create_async_engine("postgresql+asyncpg://user:pass@localhost/db"),

570

expire_on_commit=False

571

)

572

573

async def get_user_by_id(user_id: int):

574

async with async_session() as session:

575

return await session.get(User, user_id)

576

577

async def create_user(name: str, email: str):

578

async with async_session() as session:

579

user = User(name=name, email=email)

580

session.add(user)

581

await session.commit()

582

return user

583

```

584

585

### Mixing Sync and Async Operations

586

587

```python

588

async def mixed_operations():

589

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

590

591

# Some operations need sync engine (like metadata operations)

592

sync_engine = engine.sync_engine

593

594

# Create tables with sync engine

595

Base.metadata.create_all(sync_engine)

596

597

# Use async engine for data operations

598

async with AsyncSession(engine) as session:

599

stmt = select(User).limit(10)

600

result = await session.execute(stmt)

601

users = result.scalars().all()

602

603

for user in users:

604

print(f"User: {user.name}")

605

606

await engine.dispose()

607

```

608

609

### Error Handling in Async Context

610

611

```python

612

from sqlalchemy.exc import IntegrityError, NoResultFound

613

614

async def safe_user_operations():

615

engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")

616

617

async with AsyncSession(engine) as session:

618

try:

619

# Attempt to create user with duplicate email

620

user = User(name="Test", email="existing@example.com")

621

session.add(user)

622

await session.commit()

623

except IntegrityError:

624

await session.rollback()

625

print("User with this email already exists")

626

627

try:

628

# Attempt to get non-existent user

629

stmt = select(User).where(User.id == 99999)

630

result = await session.execute(stmt)

631

user = result.scalar_one() # Raises if not found

632

except NoResultFound:

633

print("User not found")

634

635

await engine.dispose()

636

```

637

638

### Async Database Driver Requirements

639

640

```python

641

# Different async drivers for different databases:

642

643

# PostgreSQL with asyncpg

644

engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")

645

646

# MySQL with aiomysql

647

engine = create_async_engine("mysql+aiomysql://user:pass@host/db")

648

649

# SQLite with aiosqlite

650

engine = create_async_engine("sqlite+aiosqlite:///path/to/database.db")

651

652

# Note: The underlying DBAPI driver must support async operations

653

# Standard drivers like psycopg2, PyMySQL, sqlite3 do NOT work with async engines

654

```