or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md

asyncio-operations.mddocs/

0

# AsyncIO Operations

1

2

Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. These classes provide native async/await syntax support and integrate seamlessly with asyncio applications.

3

4

## Capabilities

5

6

### AsyncIO Client

7

8

The main entry point for asyncio-based MongoDB operations, providing connection management and database access.

9

10

```python { .api }

11

class AsyncIOMotorClient:

12

def __init__(

13

self,

14

host: Union[str, List[str]] = 'localhost',

15

port: int = 27017,

16

document_class: type = dict,

17

tz_aware: bool = False,

18

connect: bool = True,

19

**kwargs

20

):

21

"""

22

Create a new AsyncIOMotorClient connection to MongoDB.

23

24

Parameters:

25

- host: MongoDB host(s) to connect to

26

- port: Port number for MongoDB connection

27

- document_class: Default class for documents returned from queries

28

- tz_aware: Whether datetime objects should be timezone-aware

29

- connect: Whether to connect immediately or lazily

30

- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)

31

"""

32

33

def get_database(self, name: Optional[str] = None, **kwargs) -> AsyncIOMotorDatabase:

34

"""Get a database instance."""

35

36

def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase:

37

"""Get the default database specified in connection URI."""

38

39

async def list_databases(self, session=None, **kwargs) -> AsyncIOMotorCommandCursor:

40

"""List all databases on the MongoDB server."""

41

42

async def list_database_names(self, session=None, **kwargs) -> List[str]:

43

"""Get names of all databases on the server."""

44

45

async def server_info(self) -> Dict[str, Any]:

46

"""Get information about the MongoDB server."""

47

48

async def start_session(self, **kwargs) -> AsyncIOMotorClientSession:

49

"""Start a logical session for use with transactions."""

50

51

async def drop_database(self, name_or_database: Union[str, AsyncIOMotorDatabase], session=None) -> None:

52

"""Drop a database."""

53

54

def close(self) -> None:

55

"""Close all connections to MongoDB."""

56

57

def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> AsyncIOMotorChangeStream:

58

"""Watch for changes across all collections in all databases."""

59

60

# Read-only properties

61

@property

62

def address(self) -> Optional[Tuple[str, int]]:

63

"""Current connection address."""

64

65

@property

66

def primary(self) -> Optional[Tuple[str, int]]:

67

"""Primary server address in replica set."""

68

69

@property

70

def secondaries(self) -> Set[Tuple[str, int]]:

71

"""Secondary server addresses in replica set."""

72

73

@property

74

def is_primary(self) -> bool:

75

"""Whether connected to a primary server."""

76

77

@property

78

def is_mongos(self) -> bool:

79

"""Whether connected to a mongos router."""

80

```

81

82

### AsyncIO Database

83

84

Represents a MongoDB database, providing access to collections and database-level operations.

85

86

```python { .api }

87

class AsyncIOMotorDatabase:

88

@property

89

def name(self) -> str:

90

"""Database name."""

91

92

@property

93

def client(self) -> AsyncIOMotorClient:

94

"""The client that owns this database."""

95

96

def get_collection(self, name: str, **kwargs) -> AsyncIOMotorCollection:

97

"""Get a collection in this database."""

98

99

def __getitem__(self, name: str) -> AsyncIOMotorCollection:

100

"""Get a collection using dictionary-style access."""

101

102

def __getattr__(self, name: str) -> AsyncIOMotorCollection:

103

"""Get a collection using attribute-style access."""

104

105

async def create_collection(

106

self,

107

name: str,

108

codec_options=None,

109

read_preference=None,

110

write_concern=None,

111

read_concern=None,

112

session=None,

113

**kwargs

114

) -> AsyncIOMotorCollection:

115

"""Create a new collection in this database."""

116

117

async def drop_collection(

118

self,

119

name_or_collection: Union[str, AsyncIOMotorCollection],

120

session=None

121

) -> None:

122

"""Drop a collection."""

123

124

async def list_collection_names(

125

self,

126

session=None,

127

filter: Optional[Dict[str, Any]] = None,

128

**kwargs

129

) -> List[str]:

130

"""Get names of all collections in this database."""

131

132

async def list_collections(

133

self,

134

session=None,

135

filter: Optional[Dict[str, Any]] = None,

136

**kwargs

137

) -> AsyncIOMotorCommandCursor:

138

"""List collections with metadata."""

139

140

async def command(

141

self,

142

command: Union[str, Dict[str, Any]],

143

value: Any = 1,

144

check: bool = True,

145

allowable_errors: Optional[List[str]] = None,

146

session=None,

147

**kwargs

148

) -> Dict[str, Any]:

149

"""Execute a database command."""

150

151

def aggregate(

152

self,

153

pipeline: List[Dict[str, Any]],

154

session=None,

155

**kwargs

156

) -> AsyncIOMotorCommandCursor:

157

"""Execute an aggregation pipeline on the database."""

158

159

def watch(

160

self,

161

pipeline: Optional[List[Dict[str, Any]]] = None,

162

session=None,

163

**kwargs

164

) -> AsyncIOMotorChangeStream:

165

"""Watch for changes on all collections in this database."""

166

```

167

168

### AsyncIO Collection

169

170

Represents a MongoDB collection, providing document-level operations like insert, find, update, and delete.

171

172

```python { .api }

173

class AsyncIOMotorCollection:

174

@property

175

def name(self) -> str:

176

"""Collection name."""

177

178

@property

179

def full_name(self) -> str:

180

"""Full collection name (database.collection)."""

181

182

@property

183

def database(self) -> AsyncIOMotorDatabase:

184

"""The database that owns this collection."""

185

186

# Insert Operations

187

async def insert_one(

188

self,

189

document: Dict[str, Any],

190

bypass_document_validation: bool = False,

191

session=None

192

) -> InsertOneResult:

193

"""Insert a single document."""

194

195

async def insert_many(

196

self,

197

documents: List[Dict[str, Any]],

198

ordered: bool = True,

199

bypass_document_validation: bool = False,

200

session=None

201

) -> InsertManyResult:

202

"""Insert multiple documents."""

203

204

# Find Operations

205

async def find_one(

206

self,

207

filter: Optional[Dict[str, Any]] = None,

208

*args,

209

projection: Optional[Dict[str, Any]] = None,

210

session=None,

211

**kwargs

212

) -> Optional[Dict[str, Any]]:

213

"""Find a single document."""

214

215

def find(

216

self,

217

filter: Optional[Dict[str, Any]] = None,

218

projection: Optional[Dict[str, Any]] = None,

219

skip: int = 0,

220

limit: int = 0,

221

no_cursor_timeout: bool = False,

222

cursor_type=None,

223

sort: Optional[List[Tuple[str, int]]] = None,

224

allow_partial_results: bool = False,

225

batch_size: int = 0,

226

collation=None,

227

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

228

max_time_ms: Optional[int] = None,

229

session=None,

230

**kwargs

231

) -> AsyncIOMotorCursor:

232

"""Find multiple documents, returns a cursor."""

233

234

async def find_one_and_delete(

235

self,

236

filter: Dict[str, Any],

237

projection: Optional[Dict[str, Any]] = None,

238

sort: Optional[List[Tuple[str, int]]] = None,

239

session=None,

240

**kwargs

241

) -> Optional[Dict[str, Any]]:

242

"""Find and delete a single document."""

243

244

async def find_one_and_replace(

245

self,

246

filter: Dict[str, Any],

247

replacement: Dict[str, Any],

248

projection: Optional[Dict[str, Any]] = None,

249

sort: Optional[List[Tuple[str, int]]] = None,

250

upsert: bool = False,

251

return_document: bool = False,

252

session=None,

253

**kwargs

254

) -> Optional[Dict[str, Any]]:

255

"""Find and replace a single document."""

256

257

async def find_one_and_update(

258

self,

259

filter: Dict[str, Any],

260

update: Dict[str, Any],

261

projection: Optional[Dict[str, Any]] = None,

262

sort: Optional[List[Tuple[str, int]]] = None,

263

upsert: bool = False,

264

return_document: bool = False,

265

session=None,

266

**kwargs

267

) -> Optional[Dict[str, Any]]:

268

"""Find and update a single document."""

269

270

# Update Operations

271

async def update_one(

272

self,

273

filter: Dict[str, Any],

274

update: Dict[str, Any],

275

upsert: bool = False,

276

bypass_document_validation: bool = False,

277

collation=None,

278

array_filters: Optional[List[Dict[str, Any]]] = None,

279

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

280

session=None

281

) -> UpdateResult:

282

"""Update a single document."""

283

284

async def update_many(

285

self,

286

filter: Dict[str, Any],

287

update: Dict[str, Any],

288

upsert: bool = False,

289

array_filters: Optional[List[Dict[str, Any]]] = None,

290

bypass_document_validation: bool = False,

291

collation=None,

292

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

293

session=None

294

) -> UpdateResult:

295

"""Update multiple documents."""

296

297

async def replace_one(

298

self,

299

filter: Dict[str, Any],

300

replacement: Dict[str, Any],

301

upsert: bool = False,

302

bypass_document_validation: bool = False,

303

collation=None,

304

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

305

session=None

306

) -> UpdateResult:

307

"""Replace a single document."""

308

309

# Delete Operations

310

async def delete_one(

311

self,

312

filter: Dict[str, Any],

313

collation=None,

314

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

315

session=None

316

) -> DeleteResult:

317

"""Delete a single document."""

318

319

async def delete_many(

320

self,

321

filter: Dict[str, Any],

322

collation=None,

323

hint: Optional[Union[str, List[Tuple[str, int]]]] = None,

324

session=None

325

) -> DeleteResult:

326

"""Delete multiple documents."""

327

328

# Count Operations

329

async def count_documents(

330

self,

331

filter: Dict[str, Any],

332

session=None,

333

**kwargs

334

) -> int:

335

"""Count documents matching filter."""

336

337

async def estimated_document_count(self, **kwargs) -> int:

338

"""Estimate total document count."""

339

340

# Index Operations

341

async def create_index(

342

self,

343

keys: Union[str, List[Tuple[str, int]]],

344

session=None,

345

**kwargs

346

) -> str:

347

"""Create a single index."""

348

349

async def create_indexes(

350

self,

351

indexes: List[Dict[str, Any]],

352

session=None,

353

**kwargs

354

) -> List[str]:

355

"""Create multiple indexes."""

356

357

async def drop_index(

358

self,

359

index: Union[str, List[Tuple[str, int]]],

360

session=None,

361

**kwargs

362

) -> None:

363

"""Drop a single index."""

364

365

def list_indexes(self, session=None) -> AsyncIOMotorCommandCursor:

366

"""List all indexes on the collection."""

367

368

# Aggregation Operations

369

def aggregate(

370

self,

371

pipeline: List[Dict[str, Any]],

372

session=None,

373

**kwargs

374

) -> AsyncIOMotorCommandCursor:

375

"""Execute an aggregation pipeline."""

376

377

def distinct(

378

self,

379

key: str,

380

filter: Optional[Dict[str, Any]] = None,

381

session=None,

382

**kwargs

383

) -> AsyncIOMotorCommandCursor:

384

"""Get distinct values for a field."""

385

386

# Bulk Operations

387

def bulk_write(

388

self,

389

requests: List[Any],

390

ordered: bool = True,

391

bypass_document_validation: bool = False,

392

session=None

393

) -> Any:

394

"""Execute bulk write operations."""

395

396

# Change Streams

397

def watch(

398

self,

399

pipeline: Optional[List[Dict[str, Any]]] = None,

400

full_document: Optional[str] = None,

401

resume_after: Optional[Dict[str, Any]] = None,

402

max_await_time_ms: Optional[int] = None,

403

batch_size: Optional[int] = None,

404

collation=None,

405

start_at_operation_time=None,

406

session=None,

407

start_after: Optional[Dict[str, Any]] = None,

408

**kwargs

409

) -> AsyncIOMotorChangeStream:

410

"""Watch for changes on the collection."""

411

412

# Collection Management

413

async def drop(self, session=None) -> None:

414

"""Drop the collection."""

415

416

async def rename(self, new_name: str, session=None, **kwargs) -> None:

417

"""Rename the collection."""

418

```

419

420

### AsyncIO Client Session

421

422

Client session for transaction support and causally consistent reads in AsyncIO applications.

423

424

```python { .api }

425

class AsyncIOMotorClientSession:

426

"""

427

A session for ordering sequential operations and transactions.

428

429

Created via AsyncIOMotorClient.start_session(), not directly instantiated.

430

"""

431

432

# Properties

433

@property

434

def client(self) -> AsyncIOMotorClient:

435

"""The client this session was created from."""

436

437

@property

438

def cluster_time(self) -> Optional[Dict[str, Any]]:

439

"""The cluster time returned by the last operation."""

440

441

@property

442

def has_ended(self) -> bool:

443

"""Whether this session has ended."""

444

445

@property

446

def in_transaction(self) -> bool:

447

"""Whether this session is in an active transaction."""

448

449

@property

450

def operation_time(self) -> Optional[Any]:

451

"""The operation time returned by the last operation."""

452

453

@property

454

def options(self) -> Dict[str, Any]:

455

"""The options used to create this session."""

456

457

@property

458

def session_id(self) -> Dict[str, Any]:

459

"""A BSON document identifying this session."""

460

461

# Transaction Methods

462

def start_transaction(

463

self,

464

read_concern: Optional[Any] = None,

465

write_concern: Optional[Any] = None,

466

read_preference: Optional[Any] = None,

467

max_commit_time_ms: Optional[int] = None

468

) -> Any:

469

"""

470

Start a multi-statement transaction.

471

472

Returns a context manager for the transaction.

473

Use with async context manager syntax.

474

475

Parameters:

476

- read_concern: Read concern for the transaction

477

- write_concern: Write concern for the transaction

478

- read_preference: Read preference for the transaction

479

- max_commit_time_ms: Maximum time for commit operation

480

481

Returns:

482

Transaction context manager

483

"""

484

485

async def commit_transaction(self) -> None:

486

"""Commit the current transaction."""

487

488

async def abort_transaction(self) -> None:

489

"""Abort the current transaction."""

490

491

async def with_transaction(

492

self,

493

coro: Callable,

494

read_concern: Optional[Any] = None,

495

write_concern: Optional[Any] = None,

496

read_preference: Optional[Any] = None,

497

max_commit_time_ms: Optional[int] = None

498

) -> Any:

499

"""

500

Execute a coroutine within a transaction.

501

502

Automatically handles transaction retry logic for transient errors.

503

Will retry the entire transaction for up to 120 seconds.

504

505

Parameters:

506

- coro: Async function that takes this session as first argument

507

- read_concern: Read concern for the transaction

508

- write_concern: Write concern for the transaction

509

- read_preference: Read preference for the transaction

510

- max_commit_time_ms: Maximum time for commit operation

511

512

Returns:

513

Return value from the coroutine

514

"""

515

516

# Session Management

517

async def end_session(self) -> None:

518

"""End this session."""

519

520

def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:

521

"""Advance the cluster time for this session."""

522

523

def advance_operation_time(self, operation_time: Any) -> None:

524

"""Advance the operation time for this session."""

525

526

# Context Manager Protocol

527

async def __aenter__(self) -> AsyncIOMotorClientSession:

528

"""Async context manager entry."""

529

530

async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:

531

"""Async context manager exit."""

532

```

533

534

## Usage Examples

535

536

### Basic AsyncIO Operations

537

538

```python

539

import asyncio

540

import motor.motor_asyncio

541

542

async def example():

543

client = motor.motor_asyncio.AsyncIOMotorClient()

544

db = client.test_database

545

collection = db.test_collection

546

547

# Insert operations

548

result = await collection.insert_one({"name": "Alice", "age": 30})

549

print(f"Inserted ID: {result.inserted_id}")

550

551

results = await collection.insert_many([

552

{"name": "Bob", "age": 25},

553

{"name": "Charlie", "age": 35}

554

])

555

print(f"Inserted IDs: {results.inserted_ids}")

556

557

# Find operations

558

document = await collection.find_one({"name": "Alice"})

559

print(f"Found: {document}")

560

561

# Update operations

562

result = await collection.update_one(

563

{"name": "Alice"},

564

{"$set": {"age": 31}}

565

)

566

print(f"Modified {result.modified_count} document(s)")

567

568

# Delete operations

569

result = await collection.delete_one({"name": "Alice"})

570

print(f"Deleted {result.deleted_count} document(s)")

571

572

client.close()

573

574

asyncio.run(example())

575

```

576

577

### Cursor Iteration

578

579

```python

580

async def cursor_example():

581

client = motor.motor_asyncio.AsyncIOMotorClient()

582

collection = client.test_database.test_collection

583

584

# Async iteration

585

async for document in collection.find({"age": {"$gte": 18}}):

586

print(document)

587

588

# To list with limit

589

cursor = collection.find().limit(10)

590

documents = await cursor.to_list(length=10)

591

print(f"Found {len(documents)} documents")

592

593

client.close()

594

```

595

596

### Error Handling

597

598

```python

599

import pymongo.errors

600

601

async def error_handling_example():

602

client = motor.motor_asyncio.AsyncIOMotorClient()

603

collection = client.test_database.test_collection

604

605

try:

606

await collection.insert_one({"_id": 1, "name": "test"})

607

await collection.insert_one({"_id": 1, "name": "duplicate"}) # Will fail

608

except pymongo.errors.DuplicateKeyError as e:

609

print(f"Duplicate key error: {e}")

610

except pymongo.errors.ConnectionFailure as e:

611

print(f"Connection failed: {e}")

612

finally:

613

client.close()

614

```

615

616

## Types

617

618

```python { .api }

619

from typing import Any, Optional, Union, Dict, List, Tuple, Set

620

from datetime import datetime

621

622

AsyncIOMotorClientSession = Any # Actual session type from motor

623

AsyncIOMotorCommandCursor = Any # Command cursor type

624

AsyncIOMotorCursor = Any # Query cursor type

625

AsyncIOMotorChangeStream = Any # Change stream type

626

```