0
# Tornado Operations
1
2
Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. These classes integrate seamlessly with Tornado applications and provide callback-based asynchronous operations.
3
4
## Capabilities
5
6
### Tornado Client
7
8
The main entry point for Tornado-based MongoDB operations, providing connection management and database access with Tornado Future objects.
9
10
```python { .api }
11
class MotorClient:
12
def __init__(
13
self,
14
host: Union[str, List[str]] = 'localhost',
15
port: int = 27017,
16
document_class: type = dict,
17
tz_aware: bool = False,
18
connect: bool = True,
19
io_loop=None,
20
**kwargs
21
):
22
"""
23
Create a new MotorClient connection to MongoDB.
24
25
Parameters:
26
- host: MongoDB host(s) to connect to
27
- port: Port number for MongoDB connection
28
- document_class: Default class for documents returned from queries
29
- tz_aware: Whether datetime objects should be timezone-aware
30
- connect: Whether to connect immediately or lazily
31
- io_loop: Tornado IOLoop instance (optional, uses current if None)
32
- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
33
"""
34
35
def get_database(self, name: Optional[str] = None, **kwargs) -> MotorDatabase:
36
"""Get a database instance."""
37
38
def get_default_database(self, **kwargs) -> MotorDatabase:
39
"""Get the default database specified in connection URI."""
40
41
def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future:
42
"""List all databases on the MongoDB server."""
43
44
def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future:
45
"""Get names of all databases on the server."""
46
47
def server_info(self) -> tornado.concurrent.Future:
48
"""Get information about the MongoDB server."""
49
50
def start_session(self, **kwargs) -> tornado.concurrent.Future:
51
"""Start a logical session for use with transactions."""
52
53
def drop_database(self, name_or_database: Union[str, MotorDatabase], session=None) -> tornado.concurrent.Future:
54
"""Drop a database."""
55
56
def close(self) -> None:
57
"""Close all connections to MongoDB."""
58
59
def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> MotorChangeStream:
60
"""Watch for changes across all collections in all databases."""
61
62
# Read-only properties
63
@property
64
def address(self) -> Optional[Tuple[str, int]]:
65
"""Current connection address."""
66
67
@property
68
def primary(self) -> Optional[Tuple[str, int]]:
69
"""Primary server address in replica set."""
70
71
@property
72
def secondaries(self) -> Set[Tuple[str, int]]:
73
"""Secondary server addresses in replica set."""
74
75
@property
76
def is_primary(self) -> bool:
77
"""Whether connected to a primary server."""
78
79
@property
80
def is_mongos(self) -> bool:
81
"""Whether connected to a mongos router."""
82
```
83
84
### Tornado Database
85
86
Represents a MongoDB database with Tornado Future-based operations.
87
88
```python { .api }
89
class MotorDatabase:
90
@property
91
def name(self) -> str:
92
"""Database name."""
93
94
@property
95
def client(self) -> MotorClient:
96
"""The client that owns this database."""
97
98
def get_collection(self, name: str, **kwargs) -> MotorCollection:
99
"""Get a collection in this database."""
100
101
def __getitem__(self, name: str) -> MotorCollection:
102
"""Get a collection using dictionary-style access."""
103
104
def __getattr__(self, name: str) -> MotorCollection:
105
"""Get a collection using attribute-style access."""
106
107
def create_collection(
108
self,
109
name: str,
110
codec_options=None,
111
read_preference=None,
112
write_concern=None,
113
read_concern=None,
114
session=None,
115
**kwargs
116
) -> tornado.concurrent.Future:
117
"""Create a new collection in this database."""
118
119
def drop_collection(
120
self,
121
name_or_collection: Union[str, MotorCollection],
122
session=None
123
) -> tornado.concurrent.Future:
124
"""Drop a collection."""
125
126
def list_collection_names(
127
self,
128
session=None,
129
filter: Optional[Dict[str, Any]] = None,
130
**kwargs
131
) -> tornado.concurrent.Future:
132
"""Get names of all collections in this database."""
133
134
def list_collections(
135
self,
136
session=None,
137
filter: Optional[Dict[str, Any]] = None,
138
**kwargs
139
) -> tornado.concurrent.Future:
140
"""List collections with metadata."""
141
142
def command(
143
self,
144
command: Union[str, Dict[str, Any]],
145
value: Any = 1,
146
check: bool = True,
147
allowable_errors: Optional[List[str]] = None,
148
session=None,
149
**kwargs
150
) -> tornado.concurrent.Future:
151
"""Execute a database command."""
152
153
def aggregate(
154
self,
155
pipeline: List[Dict[str, Any]],
156
session=None,
157
**kwargs
158
) -> MotorCommandCursor:
159
"""Execute an aggregation pipeline on the database."""
160
161
def watch(
162
self,
163
pipeline: Optional[List[Dict[str, Any]]] = None,
164
session=None,
165
**kwargs
166
) -> MotorChangeStream:
167
"""Watch for changes on all collections in this database."""
168
```
169
170
### Tornado Collection
171
172
Represents a MongoDB collection with Tornado Future-based document operations.
173
174
```python { .api }
175
class MotorCollection:
176
@property
177
def name(self) -> str:
178
"""Collection name."""
179
180
@property
181
def full_name(self) -> str:
182
"""Full collection name (database.collection)."""
183
184
@property
185
def database(self) -> MotorDatabase:
186
"""The database that owns this collection."""
187
188
# Insert Operations
189
def insert_one(
190
self,
191
document: Dict[str, Any],
192
bypass_document_validation: bool = False,
193
session=None
194
) -> tornado.concurrent.Future:
195
"""Insert a single document."""
196
197
def insert_many(
198
self,
199
documents: List[Dict[str, Any]],
200
ordered: bool = True,
201
bypass_document_validation: bool = False,
202
session=None
203
) -> tornado.concurrent.Future:
204
"""Insert multiple documents."""
205
206
# Find Operations
207
def find_one(
208
self,
209
filter: Optional[Dict[str, Any]] = None,
210
*args,
211
projection: Optional[Dict[str, Any]] = None,
212
session=None,
213
**kwargs
214
) -> tornado.concurrent.Future:
215
"""Find a single document."""
216
217
def find(
218
self,
219
filter: Optional[Dict[str, Any]] = None,
220
projection: Optional[Dict[str, Any]] = None,
221
skip: int = 0,
222
limit: int = 0,
223
no_cursor_timeout: bool = False,
224
cursor_type=None,
225
sort: Optional[List[Tuple[str, int]]] = None,
226
allow_partial_results: bool = False,
227
batch_size: int = 0,
228
collation=None,
229
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
230
max_time_ms: Optional[int] = None,
231
session=None,
232
**kwargs
233
) -> MotorCursor:
234
"""Find multiple documents, returns a cursor."""
235
236
def find_one_and_delete(
237
self,
238
filter: Dict[str, Any],
239
projection: Optional[Dict[str, Any]] = None,
240
sort: Optional[List[Tuple[str, int]]] = None,
241
session=None,
242
**kwargs
243
) -> tornado.concurrent.Future:
244
"""Find and delete a single document."""
245
246
def find_one_and_replace(
247
self,
248
filter: Dict[str, Any],
249
replacement: Dict[str, Any],
250
projection: Optional[Dict[str, Any]] = None,
251
sort: Optional[List[Tuple[str, int]]] = None,
252
upsert: bool = False,
253
return_document: bool = False,
254
session=None,
255
**kwargs
256
) -> tornado.concurrent.Future:
257
"""Find and replace a single document."""
258
259
def find_one_and_update(
260
self,
261
filter: Dict[str, Any],
262
update: Dict[str, Any],
263
projection: Optional[Dict[str, Any]] = None,
264
sort: Optional[List[Tuple[str, int]]] = None,
265
upsert: bool = False,
266
return_document: bool = False,
267
session=None,
268
**kwargs
269
) -> tornado.concurrent.Future:
270
"""Find and update a single document."""
271
272
# Update Operations
273
def update_one(
274
self,
275
filter: Dict[str, Any],
276
update: Dict[str, Any],
277
upsert: bool = False,
278
bypass_document_validation: bool = False,
279
collation=None,
280
array_filters: Optional[List[Dict[str, Any]]] = None,
281
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
282
session=None
283
) -> tornado.concurrent.Future:
284
"""Update a single document."""
285
286
def update_many(
287
self,
288
filter: Dict[str, Any],
289
update: Dict[str, Any],
290
upsert: bool = False,
291
array_filters: Optional[List[Dict[str, Any]]] = None,
292
bypass_document_validation: bool = False,
293
collation=None,
294
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
295
session=None
296
) -> tornado.concurrent.Future:
297
"""Update multiple documents."""
298
299
def replace_one(
300
self,
301
filter: Dict[str, Any],
302
replacement: Dict[str, Any],
303
upsert: bool = False,
304
bypass_document_validation: bool = False,
305
collation=None,
306
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
307
session=None
308
) -> tornado.concurrent.Future:
309
"""Replace a single document."""
310
311
# Delete Operations
312
def delete_one(
313
self,
314
filter: Dict[str, Any],
315
collation=None,
316
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
317
session=None
318
) -> tornado.concurrent.Future:
319
"""Delete a single document."""
320
321
def delete_many(
322
self,
323
filter: Dict[str, Any],
324
collation=None,
325
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
326
session=None
327
) -> tornado.concurrent.Future:
328
"""Delete multiple documents."""
329
330
# Count Operations
331
def count_documents(
332
self,
333
filter: Dict[str, Any],
334
session=None,
335
**kwargs
336
) -> tornado.concurrent.Future:
337
"""Count documents matching filter."""
338
339
def estimated_document_count(self, **kwargs) -> tornado.concurrent.Future:
340
"""Estimate total document count."""
341
342
# Index Operations
343
def create_index(
344
self,
345
keys: Union[str, List[Tuple[str, int]]],
346
session=None,
347
**kwargs
348
) -> tornado.concurrent.Future:
349
"""Create a single index."""
350
351
def create_indexes(
352
self,
353
indexes: List[Dict[str, Any]],
354
session=None,
355
**kwargs
356
) -> tornado.concurrent.Future:
357
"""Create multiple indexes."""
358
359
def drop_index(
360
self,
361
index: Union[str, List[Tuple[str, int]]],
362
session=None,
363
**kwargs
364
) -> tornado.concurrent.Future:
365
"""Drop a single index."""
366
367
def list_indexes(self, session=None) -> MotorCommandCursor:
368
"""List all indexes on the collection."""
369
370
# Aggregation Operations
371
def aggregate(
372
self,
373
pipeline: List[Dict[str, Any]],
374
session=None,
375
**kwargs
376
) -> MotorCommandCursor:
377
"""Execute an aggregation pipeline."""
378
379
def distinct(
380
self,
381
key: str,
382
filter: Optional[Dict[str, Any]] = None,
383
session=None,
384
**kwargs
385
) -> tornado.concurrent.Future:
386
"""Get distinct values for a field."""
387
388
# Bulk Operations
389
def bulk_write(
390
self,
391
requests: List[Any],
392
ordered: bool = True,
393
bypass_document_validation: bool = False,
394
session=None
395
) -> tornado.concurrent.Future:
396
"""Execute bulk write operations."""
397
398
# Change Streams
399
def watch(
400
self,
401
pipeline: Optional[List[Dict[str, Any]]] = None,
402
full_document: Optional[str] = None,
403
resume_after: Optional[Dict[str, Any]] = None,
404
max_await_time_ms: Optional[int] = None,
405
batch_size: Optional[int] = None,
406
collation=None,
407
start_at_operation_time=None,
408
session=None,
409
start_after: Optional[Dict[str, Any]] = None,
410
**kwargs
411
) -> MotorChangeStream:
412
"""Watch for changes on the collection."""
413
414
# Collection Management
415
def drop(self, session=None) -> tornado.concurrent.Future:
416
"""Drop the collection."""
417
418
def rename(self, new_name: str, session=None, **kwargs) -> tornado.concurrent.Future:
419
"""Rename the collection."""
420
```
421
422
### Tornado Client Session
423
424
Client session for transaction support and causally consistent reads in Tornado applications.
425
426
```python { .api }
427
class MotorClientSession:
428
"""
429
A session for ordering sequential operations and transactions.
430
431
Created via MotorClient.start_session(), not directly instantiated.
432
"""
433
434
# Properties
435
@property
436
def client(self) -> MotorClient:
437
"""The client this session was created from."""
438
439
@property
440
def cluster_time(self) -> Optional[Dict[str, Any]]:
441
"""The cluster time returned by the last operation."""
442
443
@property
444
def has_ended(self) -> bool:
445
"""Whether this session has ended."""
446
447
@property
448
def in_transaction(self) -> bool:
449
"""Whether this session is in an active transaction."""
450
451
@property
452
def operation_time(self) -> Optional[Any]:
453
"""The operation time returned by the last operation."""
454
455
@property
456
def options(self) -> Dict[str, Any]:
457
"""The options used to create this session."""
458
459
@property
460
def session_id(self) -> Dict[str, Any]:
461
"""A BSON document identifying this session."""
462
463
# Transaction Methods
464
def start_transaction(
465
self,
466
read_concern: Optional[Any] = None,
467
write_concern: Optional[Any] = None,
468
read_preference: Optional[Any] = None,
469
max_commit_time_ms: Optional[int] = None
470
) -> Any:
471
"""
472
Start a multi-statement transaction.
473
474
Returns a context manager for the transaction.
475
Use with context manager syntax.
476
477
Parameters:
478
- read_concern: Read concern for the transaction
479
- write_concern: Write concern for the transaction
480
- read_preference: Read preference for the transaction
481
- max_commit_time_ms: Maximum time for commit operation
482
483
Returns:
484
Transaction context manager
485
"""
486
487
def commit_transaction(self) -> tornado.concurrent.Future:
488
"""Commit the current transaction."""
489
490
def abort_transaction(self) -> tornado.concurrent.Future:
491
"""Abort the current transaction."""
492
493
def with_transaction(
494
self,
495
coro: Callable,
496
read_concern: Optional[Any] = None,
497
write_concern: Optional[Any] = None,
498
read_preference: Optional[Any] = None,
499
max_commit_time_ms: Optional[int] = None
500
) -> tornado.concurrent.Future:
501
"""
502
Execute a coroutine within a transaction.
503
504
Automatically handles transaction retry logic for transient errors.
505
Will retry the entire transaction for up to 120 seconds.
506
507
Parameters:
508
- coro: Function that takes this session as first argument
509
- read_concern: Read concern for the transaction
510
- write_concern: Write concern for the transaction
511
- read_preference: Read preference for the transaction
512
- max_commit_time_ms: Maximum time for commit operation
513
514
Returns:
515
Tornado Future with return value from the coroutine
516
"""
517
518
# Session Management
519
def end_session(self) -> tornado.concurrent.Future:
520
"""End this session."""
521
522
def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
523
"""Advance the cluster time for this session."""
524
525
def advance_operation_time(self, operation_time: Any) -> None:
526
"""Advance the operation time for this session."""
527
```
528
529
## Usage Examples
530
531
### Basic Tornado Operations
532
533
```python
534
import tornado.ioloop
535
import motor.motor_tornado
536
537
async def example():
538
client = motor.motor_tornado.MotorClient()
539
db = client.test_database
540
collection = db.test_collection
541
542
# Insert operations
543
result = await collection.insert_one({"name": "Alice", "age": 30})
544
print(f"Inserted ID: {result.inserted_id}")
545
546
results = await collection.insert_many([
547
{"name": "Bob", "age": 25},
548
{"name": "Charlie", "age": 35}
549
])
550
print(f"Inserted IDs: {results.inserted_ids}")
551
552
# Find operations
553
document = await collection.find_one({"name": "Alice"})
554
print(f"Found: {document}")
555
556
# Update operations
557
result = await collection.update_one(
558
{"name": "Alice"},
559
{"$set": {"age": 31}}
560
)
561
print(f"Modified {result.modified_count} document(s)")
562
563
# Delete operations
564
result = await collection.delete_one({"name": "Alice"})
565
print(f"Deleted {result.deleted_count} document(s)")
566
567
client.close()
568
569
# Run with Tornado IOLoop
570
tornado.ioloop.IOLoop.current().run_sync(example)
571
```
572
573
### Callback-Style Operations (Legacy)
574
575
```python
576
import tornado.ioloop
577
import motor.motor_tornado
578
579
def handle_result(result, error):
580
if error:
581
print(f"Error: {error}")
582
else:
583
print(f"Result: {result}")
584
tornado.ioloop.IOLoop.current().stop()
585
586
def callback_example():
587
client = motor.motor_tornado.MotorClient()
588
collection = client.test_database.test_collection
589
590
# Using callbacks (deprecated style)
591
future = collection.find_one({"name": "Alice"})
592
tornado.ioloop.IOLoop.current().add_future(
593
future,
594
lambda f: handle_result(f.result(), f.exception())
595
)
596
597
tornado.ioloop.IOLoop.current().run_sync(callback_example)
598
```
599
600
### Generator-Style Operations (Legacy)
601
602
```python
603
import tornado.gen
604
import tornado.ioloop
605
import motor.motor_tornado
606
607
@tornado.gen.coroutine
608
def gen_example():
609
client = motor.motor_tornado.MotorClient()
610
collection = client.test_database.test_collection
611
612
# Generator-based coroutines (legacy)
613
try:
614
result = yield collection.insert_one({"name": "test"})
615
print(f"Inserted: {result.inserted_id}")
616
617
document = yield collection.find_one({"name": "test"})
618
print(f"Found: {document}")
619
finally:
620
client.close()
621
622
tornado.ioloop.IOLoop.current().run_sync(gen_example)
623
```
624
625
### Web Application Integration
626
627
```python
628
import tornado.web
629
import tornado.ioloop
630
import motor.motor_tornado
631
632
class MainHandler(tornado.web.RequestHandler):
633
async def get(self):
634
db = self.application.settings['db']
635
collection = db.users
636
637
# Find users
638
users = []
639
async for user in collection.find().limit(10):
640
users.append(user)
641
642
self.write({"users": users})
643
644
def make_app():
645
client = motor.motor_tornado.MotorClient()
646
647
return tornado.web.Application([
648
(r"/", MainHandler),
649
], db=client.test_database)
650
651
if __name__ == "__main__":
652
app = make_app()
653
app.listen(8888)
654
tornado.ioloop.IOLoop.current().start()
655
```
656
657
## Types
658
659
```python { .api }
660
from typing import Any, Optional, Union, Dict, List, Tuple, Set
661
import tornado.concurrent
662
663
MotorClientSession = Any # Actual session type from motor
664
MotorCommandCursor = Any # Command cursor type
665
MotorCursor = Any # Query cursor type
666
MotorChangeStream = Any # Change stream type
667
```