Non-blocking MongoDB driver for Python asyncio and Tornado applications
—
Cursor functionality for iterating over query results, command results, and change streams. Motor provides comprehensive cursor support with async iteration, traditional cursor methods, and framework-specific optimizations.
Cursor for iterating over document query results with support for sorting, limiting, skipping, and async iteration.
# AsyncIO Query Cursor
class AsyncIOMotorCursor:
# Cursor Configuration
def limit(self, limit: int) -> AsyncIOMotorCursor:
"""Limit the number of results returned."""
def skip(self, skip: int) -> AsyncIOMotorCursor:
"""Skip a number of documents."""
def sort(
self,
key_or_list: Union[str, List[Tuple[str, int]]],
direction: Optional[int] = None
) -> AsyncIOMotorCursor:
"""
Sort the results.
Parameters:
- key_or_list: Field name or list of (field, direction) tuples
- direction: 1 for ascending, -1 for descending (when key_or_list is string)
"""
def batch_size(self, batch_size: int) -> AsyncIOMotorCursor:
"""Set the batch size for cursor operations."""
def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCursor:
"""Set maximum time in milliseconds for cursor operations."""
def hint(self, index: Union[str, List[Tuple[str, int]]]) -> AsyncIOMotorCursor:
"""Hint which index to use."""
def comment(self, comment: str) -> AsyncIOMotorCursor:
"""Add a comment to the cursor."""
def collation(self, collation: Dict[str, Any]) -> AsyncIOMotorCursor:
"""Set collation options."""
def allow_partial_results(self, allow_partial_results: bool) -> AsyncIOMotorCursor:
"""Allow partial results from mongos if some shards are down."""
# Cursor Execution
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
"""
Convert cursor to a list.
Parameters:
- length: Maximum number of documents to return (None for all)
Returns:
List of documents
"""
async def count(self, with_limit_and_skip: bool = False) -> int:
"""
Count documents (deprecated, use count_documents instead).
"""
def distinct(self, key: str) -> AsyncIOMotorCursor:
"""Get distinct values for a key."""
# Async Iterator Protocol
def __aiter__(self) -> AsyncIOMotorCursor:
"""Return self for async iteration."""
async def __anext__(self) -> Dict[str, Any]:
"""Get the next document."""
# Cursor Properties
@property
def address(self) -> Optional[Tuple[str, int]]:
"""Server address for this cursor."""
@property
def cursor_id(self) -> Optional[int]:
"""Cursor ID on the server."""
@property
def alive(self) -> bool:
"""Whether the cursor is still alive on the server."""
# Cursor Management
async def close(self) -> None:
"""Close the cursor."""
def clone(self) -> AsyncIOMotorCursor:
"""Create a copy of this cursor."""
# Tornado Query Cursor
class MotorCursor:
# Cursor Configuration (identical API, returns MotorCursor)
def limit(self, limit: int) -> MotorCursor: ...
def skip(self, skip: int) -> MotorCursor: ...
def sort(
self,
key_or_list: Union[str, List[Tuple[str, int]]],
direction: Optional[int] = None
) -> MotorCursor: ...
def batch_size(self, batch_size: int) -> MotorCursor: ...
def max_time_ms(self, max_time_ms: int) -> MotorCursor: ...
def hint(self, index: Union[str, List[Tuple[str, int]]]) -> MotorCursor: ...
def comment(self, comment: str) -> MotorCursor: ...
def collation(self, collation: Dict[str, Any]) -> MotorCursor: ...
def allow_partial_results(self, allow_partial_results: bool) -> MotorCursor: ...
# Cursor Execution (returns Tornado Futures)
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
def count(self, with_limit_and_skip: bool = False) -> tornado.concurrent.Future: ...
def distinct(self, key: str) -> tornado.concurrent.Future: ...
# Properties
@property
def address(self) -> Optional[Tuple[str, int]]: ...
@property
def cursor_id(self) -> Optional[int]: ...
@property
def alive(self) -> bool: ...
# Management
def close(self) -> tornado.concurrent.Future: ...
def clone(self) -> MotorCursor: ...
# Legacy iteration methods
def each(self, callback) -> None:
"""Deprecated: Iterate with callback."""
def next_object(self) -> tornado.concurrent.Future:
"""Deprecated: Get next object."""Cursor for iterating over database command results like aggregation pipelines and administrative commands.
# AsyncIO Command Cursor
class AsyncIOMotorCommandCursor:
# Cursor Configuration
def batch_size(self, batch_size: int) -> AsyncIOMotorCommandCursor:
"""Set the batch size for cursor operations."""
def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCommandCursor:
"""Set maximum time in milliseconds for cursor operations."""
# Cursor Execution
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
"""Convert cursor to a list."""
# Async Iterator Protocol
def __aiter__(self) -> AsyncIOMotorCommandCursor:
"""Return self for async iteration."""
async def __anext__(self) -> Dict[str, Any]:
"""Get the next document."""
# Cursor Properties
@property
def address(self) -> Optional[Tuple[str, int]]:
"""Server address for this cursor."""
@property
def cursor_id(self) -> Optional[int]:
"""Cursor ID on the server."""
@property
def alive(self) -> bool:
"""Whether the cursor is still alive on the server."""
# Cursor Management
async def close(self) -> None:
"""Close the cursor."""
# Tornado Command Cursor
class MotorCommandCursor:
def batch_size(self, batch_size: int) -> MotorCommandCursor: ...
def max_time_ms(self, max_time_ms: int) -> MotorCommandCursor: ...
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
@property
def address(self) -> Optional[Tuple[str, int]]: ...
@property
def cursor_id(self) -> Optional[int]: ...
@property
def alive(self) -> bool: ...
def close(self) -> tornado.concurrent.Future: ...
# Legacy methods
def each(self, callback) -> None: ...
def next_object(self) -> tornado.concurrent.Future: ...Special cursor type for deferred command execution, used primarily for aggregation operations that return cursors.
# AsyncIO Latent Command Cursor
class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):
"""
A command cursor that defers execution until first iteration.
Created by operations like aggregate() that return cursors.
The actual command isn't sent to MongoDB until iteration begins.
"""
def batch_size(self, batch_size: int) -> AsyncIOMotorLatentCommandCursor:
"""Set the batch size and return self for chaining."""
async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
"""Convert cursor to a list, executing the deferred command."""
def __aiter__(self) -> AsyncIOMotorLatentCommandCursor:
"""Return self for async iteration."""
async def __anext__(self) -> Dict[str, Any]:
"""Get the next document, executing command on first call."""
# Tornado Latent Command Cursor
class MotorLatentCommandCursor(MotorCommandCursor):
"""
A command cursor that defers execution until first iteration.
Created by operations like aggregate() that return cursors.
The actual command isn't sent to MongoDB until iteration begins.
"""
def batch_size(self, batch_size: int) -> MotorLatentCommandCursor:
"""Set the batch size and return self for chaining."""
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future:
"""Convert cursor to a list, executing the deferred command."""
def each(self, callback) -> None:
"""Iterate with callback, executing command on first call."""
def next_object(self) -> tornado.concurrent.Future:
"""Get next object, executing command on first call."""Specialized cursors for handling raw BSON data with minimal processing overhead.
# AsyncIO Raw Batch Cursor
class AsyncIOMotorRawBatchCursor(AsyncIOMotorCursor):
"""Cursor that returns raw BSON bytes instead of decoded documents."""
async def __anext__(self) -> bytes:
"""Get the next raw BSON document."""
# AsyncIO Raw Batch Command Cursor
class AsyncIOMotorRawBatchCommandCursor(AsyncIOMotorCommandCursor):
"""Command cursor that returns raw BSON bytes."""
async def __anext__(self) -> bytes:
"""Get the next raw BSON document."""
# Tornado equivalents
class MotorRawBatchCursor(MotorCursor):
"""Tornado cursor for raw BSON data."""
class MotorRawBatchCommandCursor(MotorCommandCursor):
"""Tornado command cursor for raw BSON data."""import asyncio
import motor.motor_asyncio
async def cursor_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.test_collection
# Insert sample data
await collection.insert_many([
{"name": "Alice", "age": 30, "city": "New York"},
{"name": "Bob", "age": 25, "city": "San Francisco"},
{"name": "Charlie", "age": 35, "city": "Chicago"},
{"name": "Diana", "age": 28, "city": "New York"},
{"name": "Eve", "age": 32, "city": "San Francisco"}
])
# Basic cursor usage
cursor = collection.find({"age": {"$gte": 25}})
# Async iteration
print("All users 25 or older:")
async for document in cursor:
print(f" {document['name']} ({document['age']}) - {document['city']}")
# Convert to list
cursor = collection.find({"city": "New York"})
users = await cursor.to_list(length=None)
print(f"\nFound {len(users)} users in New York")
# Cursor chaining
cursor = collection.find()\
.sort("age", -1)\
.limit(3)\
.skip(1)
print("\nTop 3 oldest users (skipping 1st):")
async for document in cursor:
print(f" {document['name']} ({document['age']})")
client.close()
asyncio.run(cursor_example())import asyncio
import motor.motor_asyncio
import pymongo
async def advanced_cursor_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.products
# Create index for examples
await collection.create_index([("price", 1), ("category", 1)])
# Insert sample products
await collection.insert_many([
{"name": "Laptop", "price": 999, "category": "Electronics", "brand": "Dell"},
{"name": "Phone", "price": 699, "category": "Electronics", "brand": "Apple"},
{"name": "Tablet", "price": 399, "category": "Electronics", "brand": "Samsung"},
{"name": "Book", "price": 19, "category": "Books", "brand": "Penguin"},
{"name": "Headphones", "price": 199, "category": "Electronics", "brand": "Sony"}
])
# Complex cursor with multiple options
cursor = collection.find(
{"category": "Electronics"},
{"name": 1, "price": 1, "brand": 1} # Projection
).sort([
("price", pymongo.DESCENDING),
("name", pymongo.ASCENDING)
]).limit(10).batch_size(2).hint([("price", 1), ("category", 1)])
print("Electronics sorted by price (desc), then name (asc):")
async for product in cursor:
print(f" {product['name']}: ${product['price']} ({product['brand']})")
# Cursor with collation for case-insensitive sorting
cursor = collection.find().sort("name", 1).collation({
"locale": "en",
"strength": 2 # Case insensitive
})
print("\nProducts sorted case-insensitively:")
async for product in cursor:
print(f" {product['name']}")
# Cursor with comment and max time
cursor = collection.find({"price": {"$lt": 500}})\
.comment("Finding affordable products")\
.max_time_ms(5000) # 5 second timeout
print("\nAffordable products (under $500):")
try:
async for product in cursor:
print(f" {product['name']}: ${product['price']}")
except pymongo.errors.ExecutionTimeout:
print("Query timed out!")
client.close()
asyncio.run(advanced_cursor_example())import asyncio
import motor.motor_asyncio
async def command_cursor_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client.test_database
collection = db.sales
# Insert sample sales data
await collection.insert_many([
{"product": "Laptop", "amount": 999, "date": "2023-01-15", "region": "North"},
{"product": "Phone", "amount": 699, "date": "2023-01-16", "region": "South"},
{"product": "Laptop", "amount": 999, "date": "2023-01-17", "region": "North"},
{"product": "Tablet", "amount": 399, "date": "2023-01-18", "region": "East"},
{"product": "Phone", "amount": 699, "date": "2023-01-19", "region": "West"}
])
# Aggregation pipeline
pipeline = [
{"$group": {
"_id": "$product",
"total_sales": {"$sum": "$amount"},
"count": {"$sum": 1}
}},
{"$sort": {"total_sales": -1}}
]
# Get command cursor from aggregation
cursor = collection.aggregate(pipeline)
print("Sales by Product:")
async for result in cursor:
print(f" {result['_id']}: ${result['total_sales']} ({result['count']} sales)")
# List collections command cursor
cursor = db.list_collections()
print("\nCollections in database:")
async for collection_info in cursor:
print(f" {collection_info['name']}: {collection_info['type']}")
# List databases command cursor
cursor = client.list_databases()
print("\nDatabases:")
async for db_info in cursor:
print(f" {db_info['name']}: {db_info['sizeOnDisk']} bytes")
client.close()
asyncio.run(command_cursor_example())import asyncio
import motor.motor_asyncio
import time
async def cursor_performance_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.large_collection
# Insert large amount of test data
print("Inserting test data...")
batch_size = 1000
for i in range(10): # 10,000 documents
batch = [
{"index": i * batch_size + j, "value": f"value_{i * batch_size + j}"}
for j in range(batch_size)
]
await collection.insert_many(batch)
print("Testing cursor performance...")
# Test 1: Default batch size
start_time = time.time()
cursor = collection.find()
count = 0
async for doc in cursor:
count += 1
default_time = time.time() - start_time
print(f"Default batch size: {count} docs in {default_time:.2f}s")
# Test 2: Large batch size
start_time = time.time()
cursor = collection.find().batch_size(1000)
count = 0
async for doc in cursor:
count += 1
large_batch_time = time.time() - start_time
print(f"Large batch size (1000): {count} docs in {large_batch_time:.2f}s")
# Test 3: Convert to list (single network round trip)
start_time = time.time()
cursor = collection.find()
docs = await cursor.to_list(length=None)
list_time = time.time() - start_time
print(f"to_list(): {len(docs)} docs in {list_time:.2f}s")
# Test 4: Limited results
start_time = time.time()
cursor = collection.find().limit(1000)
docs = await cursor.to_list(1000)
limited_time = time.time() - start_time
print(f"Limited (1000): {len(docs)} docs in {limited_time:.2f}s")
# Cleanup
await collection.drop()
client.close()
asyncio.run(cursor_performance_example())import asyncio
import motor.motor_asyncio
import pymongo.errors
async def cursor_error_handling_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.test_collection
try:
# Cursor with timeout
cursor = collection.find().max_time_ms(1) # Very short timeout
async for document in cursor:
print(document)
except pymongo.errors.ExecutionTimeout:
print("Cursor operation timed out")
try:
# Invalid sort specification
cursor = collection.find().sort("invalid_field", 999) # Invalid direction
await cursor.to_list(None)
except pymongo.errors.OperationFailure as e:
print(f"Sort operation failed: {e}")
try:
# Cursor on dropped collection
await collection.drop()
cursor = collection.find()
# This might work (empty result) or fail depending on timing
async for document in cursor:
print(document)
except pymongo.errors.OperationFailure as e:
print(f"Cursor on dropped collection: {e}")
client.close()
asyncio.run(cursor_error_handling_example())from typing import Any, Optional, Union, Dict, List, Tuple, Iterator, AsyncIterator
import tornado.concurrent
# Sort specifications
SortKey = Union[str, List[Tuple[str, int]]]
SortDirection = int # 1 for ascending, -1 for descending
# Cursor result types
Document = Dict[str, Any]
RawBSONBytes = bytes
# Cursor state
CursorId = Optional[int]
ServerAddress = Optional[Tuple[str, int]]Install with Tessl CLI
npx tessl i tessl/pypi-motor