CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

cursor-operations.mddocs/

Cursor Operations

Cursor functionality for iterating over query results, command results, and change streams. Motor provides comprehensive cursor support with async iteration, traditional cursor methods, and framework-specific optimizations.

Capabilities

Query Cursor

Cursor for iterating over document query results with support for sorting, limiting, skipping, and async iteration.

# AsyncIO Query Cursor
class AsyncIOMotorCursor:
    # Cursor Configuration
    def limit(self, limit: int) -> AsyncIOMotorCursor:
        """Limit the number of results returned."""
    
    def skip(self, skip: int) -> AsyncIOMotorCursor:
        """Skip a number of documents."""
    
    def sort(
        self,
        key_or_list: Union[str, List[Tuple[str, int]]],
        direction: Optional[int] = None
    ) -> AsyncIOMotorCursor:
        """
        Sort the results.
        
        Parameters:
        - key_or_list: Field name or list of (field, direction) tuples
        - direction: 1 for ascending, -1 for descending (when key_or_list is string)
        """
    
    def batch_size(self, batch_size: int) -> AsyncIOMotorCursor:
        """Set the batch size for cursor operations."""
    
    def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCursor:
        """Set maximum time in milliseconds for cursor operations."""
    
    def hint(self, index: Union[str, List[Tuple[str, int]]]) -> AsyncIOMotorCursor:
        """Hint which index to use."""
    
    def comment(self, comment: str) -> AsyncIOMotorCursor:
        """Add a comment to the cursor."""
    
    def collation(self, collation: Dict[str, Any]) -> AsyncIOMotorCursor:
        """Set collation options."""
    
    def allow_partial_results(self, allow_partial_results: bool) -> AsyncIOMotorCursor:
        """Allow partial results from mongos if some shards are down."""
    
    # Cursor Execution
    async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
        """
        Convert cursor to a list.
        
        Parameters:
        - length: Maximum number of documents to return (None for all)
        
        Returns:
        List of documents
        """
    
    async def count(self, with_limit_and_skip: bool = False) -> int:
        """
        Count documents (deprecated, use count_documents instead).
        """
    
    def distinct(self, key: str) -> AsyncIOMotorCursor:
        """Get distinct values for a key."""
    
    # Async Iterator Protocol
    def __aiter__(self) -> AsyncIOMotorCursor:
        """Return self for async iteration."""
    
    async def __anext__(self) -> Dict[str, Any]:
        """Get the next document."""
    
    # Cursor Properties
    @property
    def address(self) -> Optional[Tuple[str, int]]:
        """Server address for this cursor."""
    
    @property
    def cursor_id(self) -> Optional[int]:
        """Cursor ID on the server."""
    
    @property
    def alive(self) -> bool:
        """Whether the cursor is still alive on the server."""
    
    # Cursor Management
    async def close(self) -> None:
        """Close the cursor."""
    
    def clone(self) -> AsyncIOMotorCursor:
        """Create a copy of this cursor."""

# Tornado Query Cursor
class MotorCursor:
    # Cursor Configuration (identical API, returns MotorCursor)
    def limit(self, limit: int) -> MotorCursor: ...
    def skip(self, skip: int) -> MotorCursor: ...
    def sort(
        self,
        key_or_list: Union[str, List[Tuple[str, int]]],
        direction: Optional[int] = None
    ) -> MotorCursor: ...
    def batch_size(self, batch_size: int) -> MotorCursor: ...
    def max_time_ms(self, max_time_ms: int) -> MotorCursor: ...
    def hint(self, index: Union[str, List[Tuple[str, int]]]) -> MotorCursor: ...
    def comment(self, comment: str) -> MotorCursor: ...
    def collation(self, collation: Dict[str, Any]) -> MotorCursor: ...
    def allow_partial_results(self, allow_partial_results: bool) -> MotorCursor: ...
    
    # Cursor Execution (returns Tornado Futures)
    def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
    def count(self, with_limit_and_skip: bool = False) -> tornado.concurrent.Future: ...
    def distinct(self, key: str) -> tornado.concurrent.Future: ...
    
    # Properties
    @property
    def address(self) -> Optional[Tuple[str, int]]: ...
    @property
    def cursor_id(self) -> Optional[int]: ...
    @property
    def alive(self) -> bool: ...
    
    # Management
    def close(self) -> tornado.concurrent.Future: ...
    def clone(self) -> MotorCursor: ...
    
    # Legacy iteration methods
    def each(self, callback) -> None:
        """Deprecated: Iterate with callback."""
    
    def next_object(self) -> tornado.concurrent.Future:
        """Deprecated: Get next object."""

Command Cursor

Cursor for iterating over database command results like aggregation pipelines and administrative commands.

# AsyncIO Command Cursor
class AsyncIOMotorCommandCursor:
    # Cursor Configuration
    def batch_size(self, batch_size: int) -> AsyncIOMotorCommandCursor:
        """Set the batch size for cursor operations."""
    
    def max_time_ms(self, max_time_ms: int) -> AsyncIOMotorCommandCursor:
        """Set maximum time in milliseconds for cursor operations."""
    
    # Cursor Execution
    async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
        """Convert cursor to a list."""
    
    # Async Iterator Protocol
    def __aiter__(self) -> AsyncIOMotorCommandCursor:
        """Return self for async iteration."""
    
    async def __anext__(self) -> Dict[str, Any]:
        """Get the next document."""
    
    # Cursor Properties
    @property
    def address(self) -> Optional[Tuple[str, int]]:
        """Server address for this cursor."""
    
    @property
    def cursor_id(self) -> Optional[int]:
        """Cursor ID on the server."""
    
    @property
    def alive(self) -> bool:
        """Whether the cursor is still alive on the server."""
    
    # Cursor Management
    async def close(self) -> None:
        """Close the cursor."""

# Tornado Command Cursor
class MotorCommandCursor:
    def batch_size(self, batch_size: int) -> MotorCommandCursor: ...
    def max_time_ms(self, max_time_ms: int) -> MotorCommandCursor: ...
    
    def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
    
    @property
    def address(self) -> Optional[Tuple[str, int]]: ...
    @property
    def cursor_id(self) -> Optional[int]: ...
    @property
    def alive(self) -> bool: ...
    
    def close(self) -> tornado.concurrent.Future: ...
    
    # Legacy methods
    def each(self, callback) -> None: ...
    def next_object(self) -> tornado.concurrent.Future: ...

Latent Command Cursor

Special cursor type for deferred command execution, used primarily for aggregation operations that return cursors.

# AsyncIO Latent Command Cursor
class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):
    """
    A command cursor that defers execution until first iteration.
    
    Created by operations like aggregate() that return cursors.
    The actual command isn't sent to MongoDB until iteration begins.
    """
    
    def batch_size(self, batch_size: int) -> AsyncIOMotorLatentCommandCursor:
        """Set the batch size and return self for chaining."""
    
    async def to_list(self, length: Optional[int] = None) -> List[Dict[str, Any]]:
        """Convert cursor to a list, executing the deferred command."""
    
    def __aiter__(self) -> AsyncIOMotorLatentCommandCursor:
        """Return self for async iteration."""
    
    async def __anext__(self) -> Dict[str, Any]:
        """Get the next document, executing command on first call."""

# Tornado Latent Command Cursor
class MotorLatentCommandCursor(MotorCommandCursor):
    """
    A command cursor that defers execution until first iteration.
    
    Created by operations like aggregate() that return cursors.
    The actual command isn't sent to MongoDB until iteration begins.
    """
    
    def batch_size(self, batch_size: int) -> MotorLatentCommandCursor:
        """Set the batch size and return self for chaining."""
    
    def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future:
        """Convert cursor to a list, executing the deferred command."""
    
    def each(self, callback) -> None:
        """Iterate with callback, executing command on first call."""
    
    def next_object(self) -> tornado.concurrent.Future:
        """Get next object, executing command on first call."""

Raw Batch Cursors

Specialized cursors for handling raw BSON data with minimal processing overhead.

# AsyncIO Raw Batch Cursor
class AsyncIOMotorRawBatchCursor(AsyncIOMotorCursor):
    """Cursor that returns raw BSON bytes instead of decoded documents."""
    
    async def __anext__(self) -> bytes:
        """Get the next raw BSON document."""

# AsyncIO Raw Batch Command Cursor  
class AsyncIOMotorRawBatchCommandCursor(AsyncIOMotorCommandCursor):
    """Command cursor that returns raw BSON bytes."""
    
    async def __anext__(self) -> bytes:
        """Get the next raw BSON document."""

# Tornado equivalents
class MotorRawBatchCursor(MotorCursor):
    """Tornado cursor for raw BSON data."""
    
class MotorRawBatchCommandCursor(MotorCommandCursor):
    """Tornado command cursor for raw BSON data."""

Usage Examples

Basic Cursor Operations

import asyncio
import motor.motor_asyncio

async def cursor_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.test_collection
    
    # Insert sample data
    await collection.insert_many([
        {"name": "Alice", "age": 30, "city": "New York"},
        {"name": "Bob", "age": 25, "city": "San Francisco"},
        {"name": "Charlie", "age": 35, "city": "Chicago"},
        {"name": "Diana", "age": 28, "city": "New York"},
        {"name": "Eve", "age": 32, "city": "San Francisco"}
    ])
    
    # Basic cursor usage
    cursor = collection.find({"age": {"$gte": 25}})
    
    # Async iteration
    print("All users 25 or older:")
    async for document in cursor:
        print(f"  {document['name']} ({document['age']}) - {document['city']}")
    
    # Convert to list
    cursor = collection.find({"city": "New York"})
    users = await cursor.to_list(length=None)
    print(f"\nFound {len(users)} users in New York")
    
    # Cursor chaining
    cursor = collection.find()\
        .sort("age", -1)\
        .limit(3)\
        .skip(1)
    
    print("\nTop 3 oldest users (skipping 1st):")
    async for document in cursor:
        print(f"  {document['name']} ({document['age']})")
    
    client.close()

asyncio.run(cursor_example())

Advanced Cursor Configuration

import asyncio
import motor.motor_asyncio
import pymongo

async def advanced_cursor_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.products
    
    # Create index for examples
    await collection.create_index([("price", 1), ("category", 1)])
    
    # Insert sample products
    await collection.insert_many([
        {"name": "Laptop", "price": 999, "category": "Electronics", "brand": "Dell"},
        {"name": "Phone", "price": 699, "category": "Electronics", "brand": "Apple"},
        {"name": "Tablet", "price": 399, "category": "Electronics", "brand": "Samsung"},
        {"name": "Book", "price": 19, "category": "Books", "brand": "Penguin"},
        {"name": "Headphones", "price": 199, "category": "Electronics", "brand": "Sony"}
    ])
    
    # Complex cursor with multiple options
    cursor = collection.find(
        {"category": "Electronics"},
        {"name": 1, "price": 1, "brand": 1}  # Projection
    ).sort([
        ("price", pymongo.DESCENDING),
        ("name", pymongo.ASCENDING)
    ]).limit(10).batch_size(2).hint([("price", 1), ("category", 1)])
    
    print("Electronics sorted by price (desc), then name (asc):")
    async for product in cursor:
        print(f"  {product['name']}: ${product['price']} ({product['brand']})")
    
    # Cursor with collation for case-insensitive sorting
    cursor = collection.find().sort("name", 1).collation({
        "locale": "en",
        "strength": 2  # Case insensitive
    })
    
    print("\nProducts sorted case-insensitively:")
    async for product in cursor:
        print(f"  {product['name']}")
    
    # Cursor with comment and max time
    cursor = collection.find({"price": {"$lt": 500}})\
        .comment("Finding affordable products")\
        .max_time_ms(5000)  # 5 second timeout
    
    print("\nAffordable products (under $500):")
    try:
        async for product in cursor:
            print(f"  {product['name']}: ${product['price']}")
    except pymongo.errors.ExecutionTimeout:
        print("Query timed out!")
    
    client.close()

asyncio.run(advanced_cursor_example())

Command Cursor Usage

import asyncio
import motor.motor_asyncio

async def command_cursor_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    collection = db.sales
    
    # Insert sample sales data
    await collection.insert_many([
        {"product": "Laptop", "amount": 999, "date": "2023-01-15", "region": "North"},
        {"product": "Phone", "amount": 699, "date": "2023-01-16", "region": "South"},
        {"product": "Laptop", "amount": 999, "date": "2023-01-17", "region": "North"},
        {"product": "Tablet", "amount": 399, "date": "2023-01-18", "region": "East"},
        {"product": "Phone", "amount": 699, "date": "2023-01-19", "region": "West"}
    ])
    
    # Aggregation pipeline
    pipeline = [
        {"$group": {
            "_id": "$product",
            "total_sales": {"$sum": "$amount"},
            "count": {"$sum": 1}
        }},
        {"$sort": {"total_sales": -1}}
    ]
    
    # Get command cursor from aggregation
    cursor = collection.aggregate(pipeline)
    
    print("Sales by Product:")
    async for result in cursor:
        print(f"  {result['_id']}: ${result['total_sales']} ({result['count']} sales)")
    
    # List collections command cursor
    cursor = db.list_collections()
    
    print("\nCollections in database:")
    async for collection_info in cursor:
        print(f"  {collection_info['name']}: {collection_info['type']}")
    
    # List databases command cursor
    cursor = client.list_databases()
    
    print("\nDatabases:")
    async for db_info in cursor:
        print(f"  {db_info['name']}: {db_info['sizeOnDisk']} bytes")
    
    client.close()

asyncio.run(command_cursor_example())

Cursor Performance Optimization

import asyncio
import motor.motor_asyncio
import time

async def cursor_performance_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.large_collection
    
    # Insert large amount of test data
    print("Inserting test data...")
    batch_size = 1000
    for i in range(10):  # 10,000 documents
        batch = [
            {"index": i * batch_size + j, "value": f"value_{i * batch_size + j}"}
            for j in range(batch_size)
        ]
        await collection.insert_many(batch)
    
    print("Testing cursor performance...")
    
    # Test 1: Default batch size
    start_time = time.time()
    cursor = collection.find()
    count = 0
    async for doc in cursor:
        count += 1
    
    default_time = time.time() - start_time
    print(f"Default batch size: {count} docs in {default_time:.2f}s")
    
    # Test 2: Large batch size
    start_time = time.time()
    cursor = collection.find().batch_size(1000)
    count = 0
    async for doc in cursor:
        count += 1
    
    large_batch_time = time.time() - start_time
    print(f"Large batch size (1000): {count} docs in {large_batch_time:.2f}s")
    
    # Test 3: Convert to list (single network round trip)
    start_time = time.time()
    cursor = collection.find()
    docs = await cursor.to_list(length=None)
    list_time = time.time() - start_time
    print(f"to_list(): {len(docs)} docs in {list_time:.2f}s")
    
    # Test 4: Limited results
    start_time = time.time()
    cursor = collection.find().limit(1000)
    docs = await cursor.to_list(1000)
    limited_time = time.time() - start_time
    print(f"Limited (1000): {len(docs)} docs in {limited_time:.2f}s")
    
    # Cleanup
    await collection.drop()
    client.close()

asyncio.run(cursor_performance_example())

Cursor Error Handling

import asyncio
import motor.motor_asyncio
import pymongo.errors

async def cursor_error_handling_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.test_collection
    
    try:
        # Cursor with timeout
        cursor = collection.find().max_time_ms(1)  # Very short timeout
        
        async for document in cursor:
            print(document)
            
    except pymongo.errors.ExecutionTimeout:
        print("Cursor operation timed out")
    
    try:
        # Invalid sort specification
        cursor = collection.find().sort("invalid_field", 999)  # Invalid direction
        await cursor.to_list(None)
        
    except pymongo.errors.OperationFailure as e:
        print(f"Sort operation failed: {e}")
    
    try:
        # Cursor on dropped collection
        await collection.drop()
        cursor = collection.find()
        
        # This might work (empty result) or fail depending on timing
        async for document in cursor:
            print(document)
            
    except pymongo.errors.OperationFailure as e:
        print(f"Cursor on dropped collection: {e}")
    
    client.close()

asyncio.run(cursor_error_handling_example())

Types

from typing import Any, Optional, Union, Dict, List, Tuple, Iterator, AsyncIterator
import tornado.concurrent

# Sort specifications
SortKey = Union[str, List[Tuple[str, int]]]
SortDirection = int  # 1 for ascending, -1 for descending

# Cursor result types
Document = Dict[str, Any]
RawBSONBytes = bytes

# Cursor state
CursorId = Optional[int]
ServerAddress = Optional[Tuple[str, int]]

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json