0
# Cursor Operations
1
2
Cursor functionality for iterating over query results, command results, and change streams. Motor provides comprehensive cursor support with async iteration, traditional cursor methods, and framework-specific optimizations.
3
4
## Capabilities
5
6
### Query Cursor
7
8
Cursor for iterating over document query results with support for sorting, limiting, skipping, and async iteration.
9
10
```python { .api }
11
# AsyncIO Query Cursor
12
class AsyncIOMotorCursor:
13
# Cursor Configuration
14
def limit(self, limit: int) -> AsyncIOMotorCursor:
15
"""Limit the number of results returned."""
16
17
def skip(self, skip: int) -> AsyncIOMotorCursor:
18
"""Skip a number of documents."""
19
20
def sort(
21
self,
22
key_or_list: Union[str, List[Tuple[str, int]]],
23
direction: Optional[int] = None
24
) -> AsyncIOMotorCursor:
25
"""
26
Sort the results.
27
28
Parameters:
29
- key_or_list: Field name or list of (field, direction) tuples
30
- direction: 1 for ascending, -1 for descending (when key_or_list is string)
31
"""
32
33
def batch_size(self, batch_size: int) -> AsyncIOMotorCursor:
34
"""Set the batch size for cursor operations."""
35
36
def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCursor:
37
"""Set maximum time in milliseconds for cursor operations."""
38
39
def hint(self, index: Union[str, List[Tuple[str, int]]]) -> AsyncIOMotorCursor:
40
"""Hint which index to use."""
41
42
def comment(self, comment: str) -> AsyncIOMotorCursor:
43
"""Add a comment to the cursor."""
44
45
def collation(self, collation: Dict[str, Any]) -> AsyncIOMotorCursor:
46
"""Set collation options."""
47
48
def allow_partial_results(self, allow_partial_results: bool) -> AsyncIOMotorCursor:
49
"""Allow partial results from mongos if some shards are down."""
50
51
# Cursor Execution
52
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
53
"""
54
Convert cursor to a list.
55
56
Parameters:
57
- length: Maximum number of documents to return (None for all)
58
59
Returns:
60
List of documents
61
"""
62
63
async def count(self, with_limit_and_skip: bool = False) -> int:
64
"""
65
Count documents (deprecated, use count_documents instead).
66
"""
67
68
def distinct(self, key: str) -> AsyncIOMotorCursor:
69
"""Get distinct values for a key."""
70
71
# Async Iterator Protocol
72
def __aiter__(self) -> AsyncIOMotorCursor:
73
"""Return self for async iteration."""
74
75
async def __anext__(self) -> Dict[str, Any]:
76
"""Get the next document."""
77
78
# Cursor Properties
79
@property
80
def address(self) -> Optional[Tuple[str, int]]:
81
"""Server address for this cursor."""
82
83
@property
84
def cursor_id(self) -> Optional[int]:
85
"""Cursor ID on the server."""
86
87
@property
88
def alive(self) -> bool:
89
"""Whether the cursor is still alive on the server."""
90
91
# Cursor Management
92
async def close(self) -> None:
93
"""Close the cursor."""
94
95
def clone(self) -> AsyncIOMotorCursor:
96
"""Create a copy of this cursor."""
97
98
# Tornado Query Cursor
99
class MotorCursor:
100
# Cursor Configuration (identical API, returns MotorCursor)
101
def limit(self, limit: int) -> MotorCursor: ...
102
def skip(self, skip: int) -> MotorCursor: ...
103
def sort(
104
self,
105
key_or_list: Union[str, List[Tuple[str, int]]],
106
direction: Optional[int] = None
107
) -> MotorCursor: ...
108
def batch_size(self, batch_size: int) -> MotorCursor: ...
109
def max_time_ms(self, max_time_ms: int) -> MotorCursor: ...
110
def hint(self, index: Union[str, List[Tuple[str, int]]]) -> MotorCursor: ...
111
def comment(self, comment: str) -> MotorCursor: ...
112
def collation(self, collation: Dict[str, Any]) -> MotorCursor: ...
113
def allow_partial_results(self, allow_partial_results: bool) -> MotorCursor: ...
114
115
# Cursor Execution (returns Tornado Futures)
116
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
117
def count(self, with_limit_and_skip: bool = False) -> tornado.concurrent.Future: ...
118
def distinct(self, key: str) -> tornado.concurrent.Future: ...
119
120
# Properties
121
@property
122
def address(self) -> Optional[Tuple[str, int]]: ...
123
@property
124
def cursor_id(self) -> Optional[int]: ...
125
@property
126
def alive(self) -> bool: ...
127
128
# Management
129
def close(self) -> tornado.concurrent.Future: ...
130
def clone(self) -> MotorCursor: ...
131
132
# Legacy iteration methods
133
def each(self, callback) -> None:
134
"""Deprecated: Iterate with callback."""
135
136
def next_object(self) -> tornado.concurrent.Future:
137
"""Deprecated: Get next object."""
138
```
139
140
### Command Cursor
141
142
Cursor for iterating over database command results like aggregation pipelines and administrative commands.
143
144
```python { .api }
145
# AsyncIO Command Cursor
146
class AsyncIOMotorCommandCursor:
147
# Cursor Configuration
148
def batch_size(self, batch_size: int) -> AsyncIOMotorCommandCursor:
149
"""Set the batch size for cursor operations."""
150
151
def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCommandCursor:
152
"""Set maximum time in milliseconds for cursor operations."""
153
154
# Cursor Execution
155
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
156
"""Convert cursor to a list."""
157
158
# Async Iterator Protocol
159
def __aiter__(self) -> AsyncIOMotorCommandCursor:
160
"""Return self for async iteration."""
161
162
async def __anext__(self) -> Dict[str, Any]:
163
"""Get the next document."""
164
165
# Cursor Properties
166
@property
167
def address(self) -> Optional[Tuple[str, int]]:
168
"""Server address for this cursor."""
169
170
@property
171
def cursor_id(self) -> Optional[int]:
172
"""Cursor ID on the server."""
173
174
@property
175
def alive(self) -> bool:
176
"""Whether the cursor is still alive on the server."""
177
178
# Cursor Management
179
async def close(self) -> None:
180
"""Close the cursor."""
181
182
# Tornado Command Cursor
183
class MotorCommandCursor:
184
def batch_size(self, batch_size: int) -> MotorCommandCursor: ...
185
def max_time_ms(self, max_time_ms: int) -> MotorCommandCursor: ...
186
187
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
188
189
@property
190
def address(self) -> Optional[Tuple[str, int]]: ...
191
@property
192
def cursor_id(self) -> Optional[int]: ...
193
@property
194
def alive(self) -> bool: ...
195
196
def close(self) -> tornado.concurrent.Future: ...
197
198
# Legacy methods
199
def each(self, callback) -> None: ...
200
def next_object(self) -> tornado.concurrent.Future: ...
201
```
202
203
### Latent Command Cursor
204
205
Special cursor type for deferred command execution, used primarily for aggregation operations that return cursors.
206
207
```python { .api }
208
# AsyncIO Latent Command Cursor
209
class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):
210
"""
211
A command cursor that defers execution until first iteration.
212
213
Created by operations like aggregate() that return cursors.
214
The actual command isn't sent to MongoDB until iteration begins.
215
"""
216
217
def batch_size(self, batch_size: int) -> AsyncIOMotorLatentCommandCursor:
218
"""Set the batch size and return self for chaining."""
219
220
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
221
"""Convert cursor to a list, executing the deferred command."""
222
223
def __aiter__(self) -> AsyncIOMotorLatentCommandCursor:
224
"""Return self for async iteration."""
225
226
async def __anext__(self) -> Dict[str, Any]:
227
"""Get the next document, executing command on first call."""
228
229
# Tornado Latent Command Cursor
230
class MotorLatentCommandCursor(MotorCommandCursor):
231
"""
232
A command cursor that defers execution until first iteration.
233
234
Created by operations like aggregate() that return cursors.
235
The actual command isn't sent to MongoDB until iteration begins.
236
"""
237
238
def batch_size(self, batch_size: int) -> MotorLatentCommandCursor:
239
"""Set the batch size and return self for chaining."""
240
241
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future:
242
"""Convert cursor to a list, executing the deferred command."""
243
244
def each(self, callback) -> None:
245
"""Iterate with callback, executing command on first call."""
246
247
def next_object(self) -> tornado.concurrent.Future:
248
"""Get next object, executing command on first call."""
249
```
250
251
### Raw Batch Cursors
252
253
Specialized cursors for handling raw BSON data with minimal processing overhead.
254
255
```python { .api }
256
# AsyncIO Raw Batch Cursor
257
class AsyncIOMotorRawBatchCursor(AsyncIOMotorCursor):
258
"""Cursor that returns raw BSON bytes instead of decoded documents."""
259
260
async def __anext__(self) -> bytes:
261
"""Get the next raw BSON document."""
262
263
# AsyncIO Raw Batch Command Cursor
264
class AsyncIOMotorRawBatchCommandCursor(AsyncIOMotorCommandCursor):
265
"""Command cursor that returns raw BSON bytes."""
266
267
async def __anext__(self) -> bytes:
268
"""Get the next raw BSON document."""
269
270
# Tornado equivalents
271
class MotorRawBatchCursor(MotorCursor):
272
"""Tornado cursor for raw BSON data."""
273
274
class MotorRawBatchCommandCursor(MotorCommandCursor):
275
"""Tornado command cursor for raw BSON data."""
276
```
277
278
## Usage Examples
279
280
### Basic Cursor Operations
281
282
```python
283
import asyncio
284
import motor.motor_asyncio
285
286
async def cursor_example():
287
client = motor.motor_asyncio.AsyncIOMotorClient()
288
collection = client.test_database.test_collection
289
290
# Insert sample data
291
await collection.insert_many([
292
{"name": "Alice", "age": 30, "city": "New York"},
293
{"name": "Bob", "age": 25, "city": "San Francisco"},
294
{"name": "Charlie", "age": 35, "city": "Chicago"},
295
{"name": "Diana", "age": 28, "city": "New York"},
296
{"name": "Eve", "age": 32, "city": "San Francisco"}
297
])
298
299
# Basic cursor usage
300
cursor = collection.find({"age": {"$gte": 25}})
301
302
# Async iteration
303
print("All users 25 or older:")
304
async for document in cursor:
305
print(f" {document['name']} ({document['age']}) - {document['city']}")
306
307
# Convert to list
308
cursor = collection.find({"city": "New York"})
309
users = await cursor.to_list(length=None)
310
print(f"\nFound {len(users)} users in New York")
311
312
# Cursor chaining
313
cursor = collection.find()\
314
.sort("age", -1)\
315
.limit(3)\
316
.skip(1)
317
318
print("\nTop 3 oldest users (skipping 1st):")
319
async for document in cursor:
320
print(f" {document['name']} ({document['age']})")
321
322
client.close()
323
324
asyncio.run(cursor_example())
325
```
326
327
### Advanced Cursor Configuration
328
329
```python
330
import asyncio
331
import motor.motor_asyncio
332
import pymongo
333
334
async def advanced_cursor_example():
335
client = motor.motor_asyncio.AsyncIOMotorClient()
336
collection = client.test_database.products
337
338
# Create index for examples
339
await collection.create_index([("price", 1), ("category", 1)])
340
341
# Insert sample products
342
await collection.insert_many([
343
{"name": "Laptop", "price": 999, "category": "Electronics", "brand": "Dell"},
344
{"name": "Phone", "price": 699, "category": "Electronics", "brand": "Apple"},
345
{"name": "Tablet", "price": 399, "category": "Electronics", "brand": "Samsung"},
346
{"name": "Book", "price": 19, "category": "Books", "brand": "Penguin"},
347
{"name": "Headphones", "price": 199, "category": "Electronics", "brand": "Sony"}
348
])
349
350
# Complex cursor with multiple options
351
cursor = collection.find(
352
{"category": "Electronics"},
353
{"name": 1, "price": 1, "brand": 1} # Projection
354
).sort([
355
("price", pymongo.DESCENDING),
356
("name", pymongo.ASCENDING)
357
]).limit(10).batch_size(2).hint([("price", 1), ("category", 1)])
358
359
print("Electronics sorted by price (desc), then name (asc):")
360
async for product in cursor:
361
print(f" {product['name']}: ${product['price']} ({product['brand']})")
362
363
# Cursor with collation for case-insensitive sorting
364
cursor = collection.find().sort("name", 1).collation({
365
"locale": "en",
366
"strength": 2 # Case insensitive
367
})
368
369
print("\nProducts sorted case-insensitively:")
370
async for product in cursor:
371
print(f" {product['name']}")
372
373
# Cursor with comment and max time
374
cursor = collection.find({"price": {"$lt": 500}})\
375
.comment("Finding affordable products")\
376
.max_time_ms(5000) # 5 second timeout
377
378
print("\nAffordable products (under $500):")
379
try:
380
async for product in cursor:
381
print(f" {product['name']}: ${product['price']}")
382
except pymongo.errors.ExecutionTimeout:
383
print("Query timed out!")
384
385
client.close()
386
387
asyncio.run(advanced_cursor_example())
388
```
389
390
### Command Cursor Usage
391
392
```python
393
import asyncio
394
import motor.motor_asyncio
395
396
async def command_cursor_example():
397
client = motor.motor_asyncio.AsyncIOMotorClient()
398
db = client.test_database
399
collection = db.sales
400
401
# Insert sample sales data
402
await collection.insert_many([
403
{"product": "Laptop", "amount": 999, "date": "2023-01-15", "region": "North"},
404
{"product": "Phone", "amount": 699, "date": "2023-01-16", "region": "South"},
405
{"product": "Laptop", "amount": 999, "date": "2023-01-17", "region": "North"},
406
{"product": "Tablet", "amount": 399, "date": "2023-01-18", "region": "East"},
407
{"product": "Phone", "amount": 699, "date": "2023-01-19", "region": "West"}
408
])
409
410
# Aggregation pipeline
411
pipeline = [
412
{"$group": {
413
"_id": "$product",
414
"total_sales": {"$sum": "$amount"},
415
"count": {"$sum": 1}
416
}},
417
{"$sort": {"total_sales": -1}}
418
]
419
420
# Get command cursor from aggregation
421
cursor = collection.aggregate(pipeline)
422
423
print("Sales by Product:")
424
async for result in cursor:
425
print(f" {result['_id']}: ${result['total_sales']} ({result['count']} sales)")
426
427
# List collections command cursor
428
cursor = db.list_collections()
429
430
print("\nCollections in database:")
431
async for collection_info in cursor:
432
print(f" {collection_info['name']}: {collection_info['type']}")
433
434
# List databases command cursor
435
cursor = client.list_databases()
436
437
print("\nDatabases:")
438
async for db_info in cursor:
439
print(f" {db_info['name']}: {db_info['sizeOnDisk']} bytes")
440
441
client.close()
442
443
asyncio.run(command_cursor_example())
444
```
445
446
### Cursor Performance Optimization
447
448
```python
449
import asyncio
450
import motor.motor_asyncio
451
import time
452
453
async def cursor_performance_example():
454
client = motor.motor_asyncio.AsyncIOMotorClient()
455
collection = client.test_database.large_collection
456
457
# Insert large amount of test data
458
print("Inserting test data...")
459
batch_size = 1000
460
for i in range(10): # 10,000 documents
461
batch = [
462
{"index": i * batch_size + j, "value": f"value_{i * batch_size + j}"}
463
for j in range(batch_size)
464
]
465
await collection.insert_many(batch)
466
467
print("Testing cursor performance...")
468
469
# Test 1: Default batch size
470
start_time = time.time()
471
cursor = collection.find()
472
count = 0
473
async for doc in cursor:
474
count += 1
475
476
default_time = time.time() - start_time
477
print(f"Default batch size: {count} docs in {default_time:.2f}s")
478
479
# Test 2: Large batch size
480
start_time = time.time()
481
cursor = collection.find().batch_size(1000)
482
count = 0
483
async for doc in cursor:
484
count += 1
485
486
large_batch_time = time.time() - start_time
487
print(f"Large batch size (1000): {count} docs in {large_batch_time:.2f}s")
488
489
# Test 3: Convert to list (single network round trip)
490
start_time = time.time()
491
cursor = collection.find()
492
docs = await cursor.to_list(length=None)
493
list_time = time.time() - start_time
494
print(f"to_list(): {len(docs)} docs in {list_time:.2f}s")
495
496
# Test 4: Limited results
497
start_time = time.time()
498
cursor = collection.find().limit(1000)
499
docs = await cursor.to_list(1000)
500
limited_time = time.time() - start_time
501
print(f"Limited (1000): {len(docs)} docs in {limited_time:.2f}s")
502
503
# Cleanup
504
await collection.drop()
505
client.close()
506
507
asyncio.run(cursor_performance_example())
508
```
509
510
### Cursor Error Handling
511
512
```python
513
import asyncio
514
import motor.motor_asyncio
515
import pymongo.errors
516
517
async def cursor_error_handling_example():
518
client = motor.motor_asyncio.AsyncIOMotorClient()
519
collection = client.test_database.test_collection
520
521
try:
522
# Cursor with timeout
523
cursor = collection.find().max_time_ms(1) # Very short timeout
524
525
async for document in cursor:
526
print(document)
527
528
except pymongo.errors.ExecutionTimeout:
529
print("Cursor operation timed out")
530
531
try:
532
# Invalid sort specification
533
cursor = collection.find().sort("invalid_field", 999) # Invalid direction
534
await cursor.to_list(None)
535
536
except pymongo.errors.OperationFailure as e:
537
print(f"Sort operation failed: {e}")
538
539
try:
540
# Cursor on dropped collection
541
await collection.drop()
542
cursor = collection.find()
543
544
# This might work (empty result) or fail depending on timing
545
async for document in cursor:
546
print(document)
547
548
except pymongo.errors.OperationFailure as e:
549
print(f"Cursor on dropped collection: {e}")
550
551
client.close()
552
553
asyncio.run(cursor_error_handling_example())
554
```
555
556
## Types
557
558
```python { .api }
559
from typing import Any, Optional, Union, Dict, List, Tuple, Iterator, AsyncIterator
560
import tornado.concurrent
561
562
# Sort specifications
563
SortKey = Union[str, List[Tuple[str, int]]]
564
SortDirection = int # 1 for ascending, -1 for descending
565
566
# Cursor result types
567
Document = Dict[str, Any]
568
RawBSONBytes = bytes
569
570
# Cursor state
571
CursorId = Optional[int]
572
ServerAddress = Optional[Tuple[str, int]]
573
```