0
# AsyncIO Operations
1
2
Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. These classes provide native async/await syntax support and integrate seamlessly with asyncio applications.
3
4
## Capabilities
5
6
### AsyncIO Client
7
8
The main entry point for asyncio-based MongoDB operations, providing connection management and database access.
9
10
```python { .api }
11
class AsyncIOMotorClient:
12
def __init__(
13
self,
14
host: Union[str, List[str]] = 'localhost',
15
port: int = 27017,
16
document_class: type = dict,
17
tz_aware: bool = False,
18
connect: bool = True,
19
**kwargs
20
):
21
"""
22
Create a new AsyncIOMotorClient connection to MongoDB.
23
24
Parameters:
25
- host: MongoDB host(s) to connect to
26
- port: Port number for MongoDB connection
27
- document_class: Default class for documents returned from queries
28
- tz_aware: Whether datetime objects should be timezone-aware
29
- connect: Whether to connect immediately or lazily
30
- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
31
"""
32
33
def get_database(self, name: Optional[str] = None, **kwargs) -> AsyncIOMotorDatabase:
34
"""Get a database instance."""
35
36
def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase:
37
"""Get the default database specified in connection URI."""
38
39
async def list_databases(self, session=None, **kwargs) -> AsyncIOMotorCommandCursor:
40
"""List all databases on the MongoDB server."""
41
42
async def list_database_names(self, session=None, **kwargs) -> List[str]:
43
"""Get names of all databases on the server."""
44
45
async def server_info(self) -> Dict[str, Any]:
46
"""Get information about the MongoDB server."""
47
48
async def start_session(self, **kwargs) -> AsyncIOMotorClientSession:
49
"""Start a logical session for use with transactions."""
50
51
async def drop_database(self, name_or_database: Union[str, AsyncIOMotorDatabase], session=None) -> None:
52
"""Drop a database."""
53
54
def close(self) -> None:
55
"""Close all connections to MongoDB."""
56
57
def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> AsyncIOMotorChangeStream:
58
"""Watch for changes across all collections in all databases."""
59
60
# Read-only properties
61
@property
62
def address(self) -> Optional[Tuple[str, int]]:
63
"""Current connection address."""
64
65
@property
66
def primary(self) -> Optional[Tuple[str, int]]:
67
"""Primary server address in replica set."""
68
69
@property
70
def secondaries(self) -> Set[Tuple[str, int]]:
71
"""Secondary server addresses in replica set."""
72
73
@property
74
def is_primary(self) -> bool:
75
"""Whether connected to a primary server."""
76
77
@property
78
def is_mongos(self) -> bool:
79
"""Whether connected to a mongos router."""
80
```
81
82
### AsyncIO Database
83
84
Represents a MongoDB database, providing access to collections and database-level operations.
85
86
```python { .api }
87
class AsyncIOMotorDatabase:
88
@property
89
def name(self) -> str:
90
"""Database name."""
91
92
@property
93
def client(self) -> AsyncIOMotorClient:
94
"""The client that owns this database."""
95
96
def get_collection(self, name: str, **kwargs) -> AsyncIOMotorCollection:
97
"""Get a collection in this database."""
98
99
def __getitem__(self, name: str) -> AsyncIOMotorCollection:
100
"""Get a collection using dictionary-style access."""
101
102
def __getattr__(self, name: str) -> AsyncIOMotorCollection:
103
"""Get a collection using attribute-style access."""
104
105
async def create_collection(
106
self,
107
name: str,
108
codec_options=None,
109
read_preference=None,
110
write_concern=None,
111
read_concern=None,
112
session=None,
113
**kwargs
114
) -> AsyncIOMotorCollection:
115
"""Create a new collection in this database."""
116
117
async def drop_collection(
118
self,
119
name_or_collection: Union[str, AsyncIOMotorCollection],
120
session=None
121
) -> None:
122
"""Drop a collection."""
123
124
async def list_collection_names(
125
self,
126
session=None,
127
filter: Optional[Dict[str, Any]] = None,
128
**kwargs
129
) -> List[str]:
130
"""Get names of all collections in this database."""
131
132
async def list_collections(
133
self,
134
session=None,
135
filter: Optional[Dict[str, Any]] = None,
136
**kwargs
137
) -> AsyncIOMotorCommandCursor:
138
"""List collections with metadata."""
139
140
async def command(
141
self,
142
command: Union[str, Dict[str, Any]],
143
value: Any = 1,
144
check: bool = True,
145
allowable_errors: Optional[List[str]] = None,
146
session=None,
147
**kwargs
148
) -> Dict[str, Any]:
149
"""Execute a database command."""
150
151
def aggregate(
152
self,
153
pipeline: List[Dict[str, Any]],
154
session=None,
155
**kwargs
156
) -> AsyncIOMotorCommandCursor:
157
"""Execute an aggregation pipeline on the database."""
158
159
def watch(
160
self,
161
pipeline: Optional[List[Dict[str, Any]]] = None,
162
session=None,
163
**kwargs
164
) -> AsyncIOMotorChangeStream:
165
"""Watch for changes on all collections in this database."""
166
```
167
168
### AsyncIO Collection
169
170
Represents a MongoDB collection, providing document-level operations like insert, find, update, and delete.
171
172
```python { .api }
173
class AsyncIOMotorCollection:
174
@property
175
def name(self) -> str:
176
"""Collection name."""
177
178
@property
179
def full_name(self) -> str:
180
"""Full collection name (database.collection)."""
181
182
@property
183
def database(self) -> AsyncIOMotorDatabase:
184
"""The database that owns this collection."""
185
186
# Insert Operations
187
async def insert_one(
188
self,
189
document: Dict[str, Any],
190
bypass_document_validation: bool = False,
191
session=None
192
) -> InsertOneResult:
193
"""Insert a single document."""
194
195
async def insert_many(
196
self,
197
documents: List[Dict[str, Any]],
198
ordered: bool = True,
199
bypass_document_validation: bool = False,
200
session=None
201
) -> InsertManyResult:
202
"""Insert multiple documents."""
203
204
# Find Operations
205
async def find_one(
206
self,
207
filter: Optional[Dict[str, Any]] = None,
208
*args,
209
projection: Optional[Dict[str, Any]] = None,
210
session=None,
211
**kwargs
212
) -> Optional[Dict[str, Any]]:
213
"""Find a single document."""
214
215
def find(
216
self,
217
filter: Optional[Dict[str, Any]] = None,
218
projection: Optional[Dict[str, Any]] = None,
219
skip: int = 0,
220
limit: int = 0,
221
no_cursor_timeout: bool = False,
222
cursor_type=None,
223
sort: Optional[List[Tuple[str, int]]] = None,
224
allow_partial_results: bool = False,
225
batch_size: int = 0,
226
collation=None,
227
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
228
max_time_ms: Optional[int] = None,
229
session=None,
230
**kwargs
231
) -> AsyncIOMotorCursor:
232
"""Find multiple documents, returns a cursor."""
233
234
async def find_one_and_delete(
235
self,
236
filter: Dict[str, Any],
237
projection: Optional[Dict[str, Any]] = None,
238
sort: Optional[List[Tuple[str, int]]] = None,
239
session=None,
240
**kwargs
241
) -> Optional[Dict[str, Any]]:
242
"""Find and delete a single document."""
243
244
async def find_one_and_replace(
245
self,
246
filter: Dict[str, Any],
247
replacement: Dict[str, Any],
248
projection: Optional[Dict[str, Any]] = None,
249
sort: Optional[List[Tuple[str, int]]] = None,
250
upsert: bool = False,
251
return_document: bool = False,
252
session=None,
253
**kwargs
254
) -> Optional[Dict[str, Any]]:
255
"""Find and replace a single document."""
256
257
async def find_one_and_update(
258
self,
259
filter: Dict[str, Any],
260
update: Dict[str, Any],
261
projection: Optional[Dict[str, Any]] = None,
262
sort: Optional[List[Tuple[str, int]]] = None,
263
upsert: bool = False,
264
return_document: bool = False,
265
session=None,
266
**kwargs
267
) -> Optional[Dict[str, Any]]:
268
"""Find and update a single document."""
269
270
# Update Operations
271
async def update_one(
272
self,
273
filter: Dict[str, Any],
274
update: Dict[str, Any],
275
upsert: bool = False,
276
bypass_document_validation: bool = False,
277
collation=None,
278
array_filters: Optional[List[Dict[str, Any]]] = None,
279
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
280
session=None
281
) -> UpdateResult:
282
"""Update a single document."""
283
284
async def update_many(
285
self,
286
filter: Dict[str, Any],
287
update: Dict[str, Any],
288
upsert: bool = False,
289
array_filters: Optional[List[Dict[str, Any]]] = None,
290
bypass_document_validation: bool = False,
291
collation=None,
292
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
293
session=None
294
) -> UpdateResult:
295
"""Update multiple documents."""
296
297
async def replace_one(
298
self,
299
filter: Dict[str, Any],
300
replacement: Dict[str, Any],
301
upsert: bool = False,
302
bypass_document_validation: bool = False,
303
collation=None,
304
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
305
session=None
306
) -> UpdateResult:
307
"""Replace a single document."""
308
309
# Delete Operations
310
async def delete_one(
311
self,
312
filter: Dict[str, Any],
313
collation=None,
314
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
315
session=None
316
) -> DeleteResult:
317
"""Delete a single document."""
318
319
async def delete_many(
320
self,
321
filter: Dict[str, Any],
322
collation=None,
323
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
324
session=None
325
) -> DeleteResult:
326
"""Delete multiple documents."""
327
328
# Count Operations
329
async def count_documents(
330
self,
331
filter: Dict[str, Any],
332
session=None,
333
**kwargs
334
) -> int:
335
"""Count documents matching filter."""
336
337
async def estimated_document_count(self, **kwargs) -> int:
338
"""Estimate total document count."""
339
340
# Index Operations
341
async def create_index(
342
self,
343
keys: Union[str, List[Tuple[str, int]]],
344
session=None,
345
**kwargs
346
) -> str:
347
"""Create a single index."""
348
349
async def create_indexes(
350
self,
351
indexes: List[Dict[str, Any]],
352
session=None,
353
**kwargs
354
) -> List[str]:
355
"""Create multiple indexes."""
356
357
async def drop_index(
358
self,
359
index: Union[str, List[Tuple[str, int]]],
360
session=None,
361
**kwargs
362
) -> None:
363
"""Drop a single index."""
364
365
def list_indexes(self, session=None) -> AsyncIOMotorCommandCursor:
366
"""List all indexes on the collection."""
367
368
# Aggregation Operations
369
def aggregate(
370
self,
371
pipeline: List[Dict[str, Any]],
372
session=None,
373
**kwargs
374
) -> AsyncIOMotorCommandCursor:
375
"""Execute an aggregation pipeline."""
376
377
def distinct(
378
self,
379
key: str,
380
filter: Optional[Dict[str, Any]] = None,
381
session=None,
382
**kwargs
383
) -> AsyncIOMotorCommandCursor:
384
"""Get distinct values for a field."""
385
386
# Bulk Operations
387
def bulk_write(
388
self,
389
requests: List[Any],
390
ordered: bool = True,
391
bypass_document_validation: bool = False,
392
session=None
393
) -> Any:
394
"""Execute bulk write operations."""
395
396
# Change Streams
397
def watch(
398
self,
399
pipeline: Optional[List[Dict[str, Any]]] = None,
400
full_document: Optional[str] = None,
401
resume_after: Optional[Dict[str, Any]] = None,
402
max_await_time_ms: Optional[int] = None,
403
batch_size: Optional[int] = None,
404
collation=None,
405
start_at_operation_time=None,
406
session=None,
407
start_after: Optional[Dict[str, Any]] = None,
408
**kwargs
409
) -> AsyncIOMotorChangeStream:
410
"""Watch for changes on the collection."""
411
412
# Collection Management
413
async def drop(self, session=None) -> None:
414
"""Drop the collection."""
415
416
async def rename(self, new_name: str, session=None, **kwargs) -> None:
417
"""Rename the collection."""
418
```
419
420
### AsyncIO Client Session
421
422
Client session for transaction support and causally consistent reads in AsyncIO applications.
423
424
```python { .api }
425
class AsyncIOMotorClientSession:
426
"""
427
A session for ordering sequential operations and transactions.
428
429
Created via AsyncIOMotorClient.start_session(), not directly instantiated.
430
"""
431
432
# Properties
433
@property
434
def client(self) -> AsyncIOMotorClient:
435
"""The client this session was created from."""
436
437
@property
438
def cluster_time(self) -> Optional[Dict[str, Any]]:
439
"""The cluster time returned by the last operation."""
440
441
@property
442
def has_ended(self) -> bool:
443
"""Whether this session has ended."""
444
445
@property
446
def in_transaction(self) -> bool:
447
"""Whether this session is in an active transaction."""
448
449
@property
450
def operation_time(self) -> Optional[Any]:
451
"""The operation time returned by the last operation."""
452
453
@property
454
def options(self) -> Dict[str, Any]:
455
"""The options used to create this session."""
456
457
@property
458
def session_id(self) -> Dict[str, Any]:
459
"""A BSON document identifying this session."""
460
461
# Transaction Methods
462
def start_transaction(
463
self,
464
read_concern: Optional[Any] = None,
465
write_concern: Optional[Any] = None,
466
read_preference: Optional[Any] = None,
467
max_commit_time_ms: Optional[int] = None
468
) -> Any:
469
"""
470
Start a multi-statement transaction.
471
472
Returns a context manager for the transaction.
473
Use with async context manager syntax.
474
475
Parameters:
476
- read_concern: Read concern for the transaction
477
- write_concern: Write concern for the transaction
478
- read_preference: Read preference for the transaction
479
- max_commit_time_ms: Maximum time for commit operation
480
481
Returns:
482
Transaction context manager
483
"""
484
485
async def commit_transaction(self) -> None:
486
"""Commit the current transaction."""
487
488
async def abort_transaction(self) -> None:
489
"""Abort the current transaction."""
490
491
async def with_transaction(
492
self,
493
coro: Callable,
494
read_concern: Optional[Any] = None,
495
write_concern: Optional[Any] = None,
496
read_preference: Optional[Any] = None,
497
max_commit_time_ms: Optional[int] = None
498
) -> Any:
499
"""
500
Execute a coroutine within a transaction.
501
502
Automatically handles transaction retry logic for transient errors.
503
Will retry the entire transaction for up to 120 seconds.
504
505
Parameters:
506
- coro: Async function that takes this session as first argument
507
- read_concern: Read concern for the transaction
508
- write_concern: Write concern for the transaction
509
- read_preference: Read preference for the transaction
510
- max_commit_time_ms: Maximum time for commit operation
511
512
Returns:
513
Return value from the coroutine
514
"""
515
516
# Session Management
517
async def end_session(self) -> None:
518
"""End this session."""
519
520
def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
521
"""Advance the cluster time for this session."""
522
523
def advance_operation_time(self, operation_time: Any) -> None:
524
"""Advance the operation time for this session."""
525
526
# Context Manager Protocol
527
async def __aenter__(self) -> AsyncIOMotorClientSession:
528
"""Async context manager entry."""
529
530
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
531
"""Async context manager exit."""
532
```
533
534
## Usage Examples
535
536
### Basic AsyncIO Operations
537
538
```python
539
import asyncio
540
import motor.motor_asyncio
541
542
async def example():
543
client = motor.motor_asyncio.AsyncIOMotorClient()
544
db = client.test_database
545
collection = db.test_collection
546
547
# Insert operations
548
result = await collection.insert_one({"name": "Alice", "age": 30})
549
print(f"Inserted ID: {result.inserted_id}")
550
551
results = await collection.insert_many([
552
{"name": "Bob", "age": 25},
553
{"name": "Charlie", "age": 35}
554
])
555
print(f"Inserted IDs: {results.inserted_ids}")
556
557
# Find operations
558
document = await collection.find_one({"name": "Alice"})
559
print(f"Found: {document}")
560
561
# Update operations
562
result = await collection.update_one(
563
{"name": "Alice"},
564
{"$set": {"age": 31}}
565
)
566
print(f"Modified {result.modified_count} document(s)")
567
568
# Delete operations
569
result = await collection.delete_one({"name": "Alice"})
570
print(f"Deleted {result.deleted_count} document(s)")
571
572
client.close()
573
574
asyncio.run(example())
575
```
576
577
### Cursor Iteration
578
579
```python
580
async def cursor_example():
581
client = motor.motor_asyncio.AsyncIOMotorClient()
582
collection = client.test_database.test_collection
583
584
# Async iteration
585
async for document in collection.find({"age": {"$gte": 18}}):
586
print(document)
587
588
# To list with limit
589
cursor = collection.find().limit(10)
590
documents = await cursor.to_list(length=10)
591
print(f"Found {len(documents)} documents")
592
593
client.close()
594
```
595
596
### Error Handling
597
598
```python
599
import pymongo.errors
600
601
async def error_handling_example():
602
client = motor.motor_asyncio.AsyncIOMotorClient()
603
collection = client.test_database.test_collection
604
605
try:
606
await collection.insert_one({"_id": 1, "name": "test"})
607
await collection.insert_one({"_id": 1, "name": "duplicate"}) # Will fail
608
except pymongo.errors.DuplicateKeyError as e:
609
print(f"Duplicate key error: {e}")
610
except pymongo.errors.ConnectionFailure as e:
611
print(f"Connection failed: {e}")
612
finally:
613
client.close()
614
```
615
616
## Types
617
618
```python { .api }
619
from typing import Any, Optional, Union, Dict, List, Tuple, Set
620
from datetime import datetime
621
622
AsyncIOMotorClientSession = Any # Actual session type from motor
623
AsyncIOMotorCommandCursor = Any # Command cursor type
624
AsyncIOMotorCursor = Any # Query cursor type
625
AsyncIOMotorChangeStream = Any # Change stream type
626
```