CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

tornado-operations.mddocs/

Tornado Operations

Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. These classes integrate seamlessly with Tornado applications and provide callback-based asynchronous operations.

Capabilities

Tornado Client

The main entry point for Tornado-based MongoDB operations, providing connection management and database access with Tornado Future objects.

class MotorClient:
    def __init__(
        self,
        host: Union[str, List[str]] = 'localhost',
        port: int = 27017,
        document_class: type = dict,
        tz_aware: bool = False,
        connect: bool = True,
        io_loop=None,
        **kwargs
    ):
        """
        Create a new MotorClient connection to MongoDB.
        
        Parameters:
        - host: MongoDB host(s) to connect to
        - port: Port number for MongoDB connection  
        - document_class: Default class for documents returned from queries
        - tz_aware: Whether datetime objects should be timezone-aware
        - connect: Whether to connect immediately or lazily
        - io_loop: Tornado IOLoop instance (optional, uses current if None)
        - **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
        """

    def get_database(self, name: Optional[str] = None, **kwargs) -> MotorDatabase:
        """Get a database instance."""
    
    def get_default_database(self, **kwargs) -> MotorDatabase:
        """Get the default database specified in connection URI."""
    
    def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future:
        """List all databases on the MongoDB server."""
    
    def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future:
        """Get names of all databases on the server."""
    
    def server_info(self) -> tornado.concurrent.Future:
        """Get information about the MongoDB server."""
    
    def start_session(self, **kwargs) -> tornado.concurrent.Future:
        """Start a logical session for use with transactions."""
    
    def drop_database(self, name_or_database: Union[str, MotorDatabase], session=None) -> tornado.concurrent.Future:
        """Drop a database."""
    
    def close(self) -> None:
        """Close all connections to MongoDB."""
    
    def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> MotorChangeStream:
        """Watch for changes across all collections in all databases."""
    
    # Read-only properties
    @property
    def address(self) -> Optional[Tuple[str, int]]:
        """Current connection address."""
    
    @property
    def primary(self) -> Optional[Tuple[str, int]]:
        """Primary server address in replica set."""
    
    @property
    def secondaries(self) -> Set[Tuple[str, int]]:
        """Secondary server addresses in replica set."""
    
    @property
    def is_primary(self) -> bool:
        """Whether connected to a primary server."""
    
    @property
    def is_mongos(self) -> bool:
        """Whether connected to a mongos router."""

Tornado Database

Represents a MongoDB database with Tornado Future-based operations.

class MotorDatabase:
    @property
    def name(self) -> str:
        """Database name."""
    
    @property
    def client(self) -> MotorClient:
        """The client that owns this database."""
    
    def get_collection(self, name: str, **kwargs) -> MotorCollection:
        """Get a collection in this database."""
    
    def __getitem__(self, name: str) -> MotorCollection:
        """Get a collection using dictionary-style access."""
    
    def __getattr__(self, name: str) -> MotorCollection:
        """Get a collection using attribute-style access."""
    
    def create_collection(
        self, 
        name: str,
        codec_options=None,
        read_preference=None,
        write_concern=None,
        read_concern=None,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Create a new collection in this database."""
    
    def drop_collection(
        self,
        name_or_collection: Union[str, MotorCollection],
        session=None
    ) -> tornado.concurrent.Future:
        """Drop a collection."""
    
    def list_collection_names(
        self,
        session=None,
        filter: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Get names of all collections in this database."""
    
    def list_collections(
        self,
        session=None,
        filter: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """List collections with metadata."""
    
    def command(
        self,
        command: Union[str, Dict[str, Any]],
        value: Any = 1,
        check: bool = True,
        allowable_errors: Optional[List[str]] = None,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Execute a database command."""
    
    def aggregate(
        self,
        pipeline: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> MotorCommandCursor:
        """Execute an aggregation pipeline on the database."""
    
    def watch(
        self,
        pipeline: Optional[List[Dict[str, Any]]] = None,
        session=None,
        **kwargs
    ) -> MotorChangeStream:
        """Watch for changes on all collections in this database."""

Tornado Collection

Represents a MongoDB collection with Tornado Future-based document operations.

class MotorCollection:
    @property
    def name(self) -> str:
        """Collection name."""
    
    @property
    def full_name(self) -> str:
        """Full collection name (database.collection)."""
    
    @property
    def database(self) -> MotorDatabase:
        """The database that owns this collection."""
    
    # Insert Operations
    def insert_one(
        self,
        document: Dict[str, Any],
        bypass_document_validation: bool = False,
        session=None
    ) -> tornado.concurrent.Future:
        """Insert a single document."""
    
    def insert_many(
        self,
        documents: List[Dict[str, Any]],
        ordered: bool = True,
        bypass_document_validation: bool = False,
        session=None
    ) -> tornado.concurrent.Future:
        """Insert multiple documents."""
    
    # Find Operations
    def find_one(
        self,
        filter: Optional[Dict[str, Any]] = None,
        *args,
        projection: Optional[Dict[str, Any]] = None,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Find a single document."""
    
    def find(
        self,
        filter: Optional[Dict[str, Any]] = None,
        projection: Optional[Dict[str, Any]] = None,
        skip: int = 0,
        limit: int = 0,
        no_cursor_timeout: bool = False,
        cursor_type=None,
        sort: Optional[List[Tuple[str, int]]] = None,
        allow_partial_results: bool = False,
        batch_size: int = 0,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        max_time_ms: Optional[int] = None,
        session=None,
        **kwargs
    ) -> MotorCursor:
        """Find multiple documents, returns a cursor."""
    
    def find_one_and_delete(
        self,
        filter: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Find and delete a single document."""
    
    def find_one_and_replace(
        self,
        filter: Dict[str, Any],
        replacement: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        upsert: bool = False,
        return_document: bool = False,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Find and replace a single document."""
    
    def find_one_and_update(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        upsert: bool = False,
        return_document: bool = False,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Find and update a single document."""
    
    # Update Operations
    def update_one(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        upsert: bool = False,
        bypass_document_validation: bool = False,
        collation=None,
        array_filters: Optional[List[Dict[str, Any]]] = None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Update a single document."""
    
    def update_many(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        upsert: bool = False,
        array_filters: Optional[List[Dict[str, Any]]] = None,
        bypass_document_validation: bool = False,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Update multiple documents."""
    
    def replace_one(
        self,
        filter: Dict[str, Any],
        replacement: Dict[str, Any],
        upsert: bool = False,
        bypass_document_validation: bool = False,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Replace a single document."""
    
    # Delete Operations
    def delete_one(
        self,
        filter: Dict[str, Any],
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Delete a single document."""
    
    def delete_many(
        self,
        filter: Dict[str, Any],
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Delete multiple documents."""
    
    # Count Operations
    def count_documents(
        self,
        filter: Dict[str, Any],
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Count documents matching filter."""
    
    def estimated_document_count(self, **kwargs) -> tornado.concurrent.Future:
        """Estimate total document count."""
    
    # Index Operations
    def create_index(
        self,
        keys: Union[str, List[Tuple[str, int]]],
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Create a single index."""
    
    def create_indexes(
        self,
        indexes: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Create multiple indexes."""
    
    def drop_index(
        self,
        index: Union[str, List[Tuple[str, int]]],
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Drop a single index."""
    
    def list_indexes(self, session=None) -> MotorCommandCursor:
        """List all indexes on the collection."""
    
    # Aggregation Operations
    def aggregate(
        self,
        pipeline: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> MotorCommandCursor:
        """Execute an aggregation pipeline."""
    
    def distinct(
        self,
        key: str,
        filter: Optional[Dict[str, Any]] = None,
        session=None,
        **kwargs
    ) -> tornado.concurrent.Future:
        """Get distinct values for a field."""
    
    # Bulk Operations
    def bulk_write(
        self,
        requests: List[Any],
        ordered: bool = True,
        bypass_document_validation: bool = False,
        session=None
    ) -> tornado.concurrent.Future:
        """Execute bulk write operations."""
    
    # Change Streams
    def watch(
        self,
        pipeline: Optional[List[Dict[str, Any]]] = None,
        full_document: Optional[str] = None,
        resume_after: Optional[Dict[str, Any]] = None,
        max_await_time_ms: Optional[int] = None,
        batch_size: Optional[int] = None,
        collation=None,
        start_at_operation_time=None,
        session=None,
        start_after: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> MotorChangeStream:
        """Watch for changes on the collection."""
    
    # Collection Management
    def drop(self, session=None) -> tornado.concurrent.Future:
        """Drop the collection."""
    
    def rename(self, new_name: str, session=None, **kwargs) -> tornado.concurrent.Future:
        """Rename the collection."""

Tornado Client Session

Client session for transaction support and causally consistent reads in Tornado applications.

class MotorClientSession:
    """
    A session for ordering sequential operations and transactions.
    
    Created via MotorClient.start_session(), not directly instantiated.
    """
    
    # Properties
    @property
    def client(self) -> MotorClient:
        """The client this session was created from."""
    
    @property
    def cluster_time(self) -> Optional[Dict[str, Any]]:
        """The cluster time returned by the last operation."""
    
    @property
    def has_ended(self) -> bool:
        """Whether this session has ended."""
    
    @property
    def in_transaction(self) -> bool:
        """Whether this session is in an active transaction."""
    
    @property
    def operation_time(self) -> Optional[Any]:
        """The operation time returned by the last operation."""
    
    @property
    def options(self) -> Dict[str, Any]:
        """The options used to create this session."""
    
    @property
    def session_id(self) -> Dict[str, Any]:
        """A BSON document identifying this session."""
    
    # Transaction Methods
    def start_transaction(
        self,
        read_concern: Optional[Any] = None,
        write_concern: Optional[Any] = None,
        read_preference: Optional[Any] = None,
        max_commit_time_ms: Optional[int] = None
    ) -> Any:
        """
        Start a multi-statement transaction.
        
        Returns a context manager for the transaction.
        Use with context manager syntax.
        
        Parameters:
        - read_concern: Read concern for the transaction
        - write_concern: Write concern for the transaction  
        - read_preference: Read preference for the transaction
        - max_commit_time_ms: Maximum time for commit operation
        
        Returns:
        Transaction context manager
        """
    
    def commit_transaction(self) -> tornado.concurrent.Future:
        """Commit the current transaction."""
    
    def abort_transaction(self) -> tornado.concurrent.Future:
        """Abort the current transaction."""
    
    def with_transaction(
        self,
        coro: Callable,
        read_concern: Optional[Any] = None,
        write_concern: Optional[Any] = None,
        read_preference: Optional[Any] = None,
        max_commit_time_ms: Optional[int] = None
    ) -> tornado.concurrent.Future:
        """
        Execute a coroutine within a transaction.
        
        Automatically handles transaction retry logic for transient errors.
        Will retry the entire transaction for up to 120 seconds.
        
        Parameters:
        - coro: Function that takes this session as first argument
        - read_concern: Read concern for the transaction
        - write_concern: Write concern for the transaction
        - read_preference: Read preference for the transaction
        - max_commit_time_ms: Maximum time for commit operation
        
        Returns:
        Tornado Future with return value from the coroutine
        """
    
    # Session Management
    def end_session(self) -> tornado.concurrent.Future:
        """End this session."""
    
    def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
        """Advance the cluster time for this session."""
    
    def advance_operation_time(self, operation_time: Any) -> None:
        """Advance the operation time for this session."""

Usage Examples

Basic Tornado Operations

import tornado.ioloop
import motor.motor_tornado

async def example():
    client = motor.motor_tornado.MotorClient()
    db = client.test_database
    collection = db.test_collection
    
    # Insert operations
    result = await collection.insert_one({"name": "Alice", "age": 30})
    print(f"Inserted ID: {result.inserted_id}")
    
    results = await collection.insert_many([
        {"name": "Bob", "age": 25},
        {"name": "Charlie", "age": 35}
    ])
    print(f"Inserted IDs: {results.inserted_ids}")
    
    # Find operations
    document = await collection.find_one({"name": "Alice"})
    print(f"Found: {document}")
    
    # Update operations
    result = await collection.update_one(
        {"name": "Alice"},
        {"$set": {"age": 31}}
    )
    print(f"Modified {result.modified_count} document(s)")
    
    # Delete operations
    result = await collection.delete_one({"name": "Alice"})
    print(f"Deleted {result.deleted_count} document(s)")
    
    client.close()

# Run with Tornado IOLoop
tornado.ioloop.IOLoop.current().run_sync(example)

Callback-Style Operations (Legacy)

import tornado.ioloop
import motor.motor_tornado

def handle_result(result, error):
    if error:
        print(f"Error: {error}")
    else:
        print(f"Result: {result}")
    tornado.ioloop.IOLoop.current().stop()

def callback_example():
    client = motor.motor_tornado.MotorClient()
    collection = client.test_database.test_collection
    
    # Using callbacks (deprecated style)
    future = collection.find_one({"name": "Alice"})
    tornado.ioloop.IOLoop.current().add_future(
        future, 
        lambda f: handle_result(f.result(), f.exception())
    )

tornado.ioloop.IOLoop.current().run_sync(callback_example)

Generator-Style Operations (Legacy)

import tornado.gen
import tornado.ioloop
import motor.motor_tornado

@tornado.gen.coroutine
def gen_example():
    client = motor.motor_tornado.MotorClient()
    collection = client.test_database.test_collection
    
    # Generator-based coroutines (legacy)
    try:
        result = yield collection.insert_one({"name": "test"})
        print(f"Inserted: {result.inserted_id}")
        
        document = yield collection.find_one({"name": "test"})
        print(f"Found: {document}")
    finally:
        client.close()

tornado.ioloop.IOLoop.current().run_sync(gen_example)

Web Application Integration

import tornado.web
import tornado.ioloop
import motor.motor_tornado

class MainHandler(tornado.web.RequestHandler):
    async def get(self):
        db = self.application.settings['db']
        collection = db.users
        
        # Find users
        users = []
        async for user in collection.find().limit(10):
            users.append(user)
        
        self.write({"users": users})

def make_app():
    client = motor.motor_tornado.MotorClient()
    
    return tornado.web.Application([
        (r"/", MainHandler),
    ], db=client.test_database)

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Types

from typing import Any, Optional, Union, Dict, List, Tuple, Set
import tornado.concurrent

MotorClientSession = Any  # Actual session type from motor
MotorCommandCursor = Any  # Command cursor type
MotorCursor = Any  # Query cursor type  
MotorChangeStream = Any  # Change stream type

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json