CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

asyncio-operations.mddocs/

AsyncIO Operations

Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. These classes provide native async/await syntax support and integrate seamlessly with asyncio applications.

Capabilities

AsyncIO Client

The main entry point for asyncio-based MongoDB operations, providing connection management and database access.

class AsyncIOMotorClient:
    def __init__(
        self,
        host: Union[str, List[str]] = 'localhost',
        port: int = 27017,
        document_class: type = dict,
        tz_aware: bool = False,
        connect: bool = True,
        **kwargs
    ):
        """
        Create a new AsyncIOMotorClient connection to MongoDB.
        
        Parameters:
        - host: MongoDB host(s) to connect to
        - port: Port number for MongoDB connection  
        - document_class: Default class for documents returned from queries
        - tz_aware: Whether datetime objects should be timezone-aware
        - connect: Whether to connect immediately or lazily
        - **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
        """

    def get_database(self, name: Optional[str] = None, **kwargs) -> AsyncIOMotorDatabase:
        """Get a database instance."""
    
    def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase:
        """Get the default database specified in connection URI."""
    
    async def list_databases(self, session=None, **kwargs) -> AsyncIOMotorCommandCursor:
        """List all databases on the MongoDB server."""
    
    async def list_database_names(self, session=None, **kwargs) -> List[str]:
        """Get names of all databases on the server."""
    
    async def server_info(self) -> Dict[str, Any]:
        """Get information about the MongoDB server."""
    
    async def start_session(self, **kwargs) -> AsyncIOMotorClientSession:
        """Start a logical session for use with transactions."""
    
    async def drop_database(self, name_or_database: Union[str, AsyncIOMotorDatabase], session=None) -> None:
        """Drop a database."""
    
    def close(self) -> None:
        """Close all connections to MongoDB."""
    
    def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> AsyncIOMotorChangeStream:
        """Watch for changes across all collections in all databases."""
    
    # Read-only properties
    @property
    def address(self) -> Optional[Tuple[str, int]]:
        """Current connection address."""
    
    @property
    def primary(self) -> Optional[Tuple[str, int]]:
        """Primary server address in replica set."""
    
    @property
    def secondaries(self) -> Set[Tuple[str, int]]:
        """Secondary server addresses in replica set."""
    
    @property
    def is_primary(self) -> bool:
        """Whether connected to a primary server."""
    
    @property
    def is_mongos(self) -> bool:
        """Whether connected to a mongos router."""

AsyncIO Database

Represents a MongoDB database, providing access to collections and database-level operations.

class AsyncIOMotorDatabase:
    @property
    def name(self) -> str:
        """Database name."""
    
    @property
    def client(self) -> AsyncIOMotorClient:
        """The client that owns this database."""
    
    def get_collection(self, name: str, **kwargs) -> AsyncIOMotorCollection:
        """Get a collection in this database."""
    
    def __getitem__(self, name: str) -> AsyncIOMotorCollection:
        """Get a collection using dictionary-style access."""
    
    def __getattr__(self, name: str) -> AsyncIOMotorCollection:
        """Get a collection using attribute-style access."""
    
    async def create_collection(
        self, 
        name: str,
        codec_options=None,
        read_preference=None,
        write_concern=None,
        read_concern=None,
        session=None,
        **kwargs
    ) -> AsyncIOMotorCollection:
        """Create a new collection in this database."""
    
    async def drop_collection(
        self,
        name_or_collection: Union[str, AsyncIOMotorCollection],
        session=None
    ) -> None:
        """Drop a collection."""
    
    async def list_collection_names(
        self,
        session=None,
        filter: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> List[str]:
        """Get names of all collections in this database."""
    
    async def list_collections(
        self,
        session=None,
        filter: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> AsyncIOMotorCommandCursor:
        """List collections with metadata."""
    
    async def command(
        self,
        command: Union[str, Dict[str, Any]],
        value: Any = 1,
        check: bool = True,
        allowable_errors: Optional[List[str]] = None,
        session=None,
        **kwargs
    ) -> Dict[str, Any]:
        """Execute a database command."""
    
    def aggregate(
        self,
        pipeline: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> AsyncIOMotorCommandCursor:
        """Execute an aggregation pipeline on the database."""
    
    def watch(
        self,
        pipeline: Optional[List[Dict[str, Any]]] = None,
        session=None,
        **kwargs
    ) -> AsyncIOMotorChangeStream:
        """Watch for changes on all collections in this database."""

AsyncIO Collection

Represents a MongoDB collection, providing document-level operations like insert, find, update, and delete.

class AsyncIOMotorCollection:
    @property
    def name(self) -> str:
        """Collection name."""
    
    @property
    def full_name(self) -> str:
        """Full collection name (database.collection)."""
    
    @property
    def database(self) -> AsyncIOMotorDatabase:
        """The database that owns this collection."""
    
    # Insert Operations
    async def insert_one(
        self,
        document: Dict[str, Any],
        bypass_document_validation: bool = False,
        session=None
    ) -> InsertOneResult:
        """Insert a single document."""
    
    async def insert_many(
        self,
        documents: List[Dict[str, Any]],
        ordered: bool = True,
        bypass_document_validation: bool = False,
        session=None
    ) -> InsertManyResult:
        """Insert multiple documents."""
    
    # Find Operations
    async def find_one(
        self,
        filter: Optional[Dict[str, Any]] = None,
        *args,
        projection: Optional[Dict[str, Any]] = None,
        session=None,
        **kwargs
    ) -> Optional[Dict[str, Any]]:
        """Find a single document."""
    
    def find(
        self,
        filter: Optional[Dict[str, Any]] = None,
        projection: Optional[Dict[str, Any]] = None,
        skip: int = 0,
        limit: int = 0,
        no_cursor_timeout: bool = False,
        cursor_type=None,
        sort: Optional[List[Tuple[str, int]]] = None,
        allow_partial_results: bool = False,
        batch_size: int = 0,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        max_time_ms: Optional[int] = None,
        session=None,
        **kwargs
    ) -> AsyncIOMotorCursor:
        """Find multiple documents, returns a cursor."""
    
    async def find_one_and_delete(
        self,
        filter: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        session=None,
        **kwargs
    ) -> Optional[Dict[str, Any]]:
        """Find and delete a single document."""
    
    async def find_one_and_replace(
        self,
        filter: Dict[str, Any],
        replacement: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        upsert: bool = False,
        return_document: bool = False,
        session=None,
        **kwargs
    ) -> Optional[Dict[str, Any]]:
        """Find and replace a single document."""
    
    async def find_one_and_update(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        projection: Optional[Dict[str, Any]] = None,
        sort: Optional[List[Tuple[str, int]]] = None,
        upsert: bool = False,
        return_document: bool = False,
        session=None,
        **kwargs
    ) -> Optional[Dict[str, Any]]:
        """Find and update a single document."""
    
    # Update Operations
    async def update_one(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        upsert: bool = False,
        bypass_document_validation: bool = False,
        collation=None,
        array_filters: Optional[List[Dict[str, Any]]] = None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> UpdateResult:
        """Update a single document."""
    
    async def update_many(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        upsert: bool = False,
        array_filters: Optional[List[Dict[str, Any]]] = None,
        bypass_document_validation: bool = False,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> UpdateResult:
        """Update multiple documents."""
    
    async def replace_one(
        self,
        filter: Dict[str, Any],
        replacement: Dict[str, Any],
        upsert: bool = False,
        bypass_document_validation: bool = False,
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> UpdateResult:
        """Replace a single document."""
    
    # Delete Operations
    async def delete_one(
        self,
        filter: Dict[str, Any],
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> DeleteResult:
        """Delete a single document."""
    
    async def delete_many(
        self,
        filter: Dict[str, Any],
        collation=None,
        hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
        session=None
    ) -> DeleteResult:
        """Delete multiple documents."""
    
    # Count Operations
    async def count_documents(
        self,
        filter: Dict[str, Any],
        session=None,
        **kwargs
    ) -> int:
        """Count documents matching filter."""
    
    async def estimated_document_count(self, **kwargs) -> int:
        """Estimate total document count."""
    
    # Index Operations
    async def create_index(
        self,
        keys: Union[str, List[Tuple[str, int]]],
        session=None,
        **kwargs
    ) -> str:
        """Create a single index."""
    
    async def create_indexes(
        self,
        indexes: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> List[str]:
        """Create multiple indexes."""
    
    async def drop_index(
        self,
        index: Union[str, List[Tuple[str, int]]],
        session=None,
        **kwargs
    ) -> None:
        """Drop a single index."""
    
    def list_indexes(self, session=None) -> AsyncIOMotorCommandCursor:
        """List all indexes on the collection."""
    
    # Aggregation Operations
    def aggregate(
        self,
        pipeline: List[Dict[str, Any]],
        session=None,
        **kwargs
    ) -> AsyncIOMotorCommandCursor:
        """Execute an aggregation pipeline."""
    
    def distinct(
        self,
        key: str,
        filter: Optional[Dict[str, Any]] = None,
        session=None,
        **kwargs
    ) -> AsyncIOMotorCommandCursor:
        """Get distinct values for a field."""
    
    # Bulk Operations
    def bulk_write(
        self,
        requests: List[Any],
        ordered: bool = True,
        bypass_document_validation: bool = False,
        session=None
    ) -> Any:
        """Execute bulk write operations."""
    
    # Change Streams
    def watch(
        self,
        pipeline: Optional[List[Dict[str, Any]]] = None,
        full_document: Optional[str] = None,
        resume_after: Optional[Dict[str, Any]] = None,
        max_await_time_ms: Optional[int] = None,
        batch_size: Optional[int] = None,
        collation=None,
        start_at_operation_time=None,
        session=None,
        start_after: Optional[Dict[str, Any]] = None,
        **kwargs
    ) -> AsyncIOMotorChangeStream:
        """Watch for changes on the collection."""
    
    # Collection Management
    async def drop(self, session=None) -> None:
        """Drop the collection."""
    
    async def rename(self, new_name: str, session=None, **kwargs) -> None:
        """Rename the collection."""

AsyncIO Client Session

Client session for transaction support and causally consistent reads in AsyncIO applications.

class AsyncIOMotorClientSession:
    """
    A session for ordering sequential operations and transactions.
    
    Created via AsyncIOMotorClient.start_session(), not directly instantiated.
    """
    
    # Properties
    @property
    def client(self) -> AsyncIOMotorClient:
        """The client this session was created from."""
    
    @property
    def cluster_time(self) -> Optional[Dict[str, Any]]:
        """The cluster time returned by the last operation."""
    
    @property
    def has_ended(self) -> bool:
        """Whether this session has ended."""
    
    @property
    def in_transaction(self) -> bool:
        """Whether this session is in an active transaction."""
    
    @property
    def operation_time(self) -> Optional[Any]:
        """The operation time returned by the last operation."""
    
    @property
    def options(self) -> Dict[str, Any]:
        """The options used to create this session."""
    
    @property
    def session_id(self) -> Dict[str, Any]:
        """A BSON document identifying this session."""
    
    # Transaction Methods
    def start_transaction(
        self,
        read_concern: Optional[Any] = None,
        write_concern: Optional[Any] = None,
        read_preference: Optional[Any] = None,
        max_commit_time_ms: Optional[int] = None
    ) -> Any:
        """
        Start a multi-statement transaction.
        
        Returns a context manager for the transaction.
        Use with async context manager syntax.
        
        Parameters:
        - read_concern: Read concern for the transaction
        - write_concern: Write concern for the transaction  
        - read_preference: Read preference for the transaction
        - max_commit_time_ms: Maximum time for commit operation
        
        Returns:
        Transaction context manager
        """
    
    async def commit_transaction(self) -> None:
        """Commit the current transaction."""
    
    async def abort_transaction(self) -> None:
        """Abort the current transaction."""
    
    async def with_transaction(
        self,
        coro: Callable,
        read_concern: Optional[Any] = None,
        write_concern: Optional[Any] = None,
        read_preference: Optional[Any] = None,
        max_commit_time_ms: Optional[int] = None
    ) -> Any:
        """
        Execute a coroutine within a transaction.
        
        Automatically handles transaction retry logic for transient errors.
        Will retry the entire transaction for up to 120 seconds.
        
        Parameters:
        - coro: Async function that takes this session as first argument
        - read_concern: Read concern for the transaction
        - write_concern: Write concern for the transaction
        - read_preference: Read preference for the transaction
        - max_commit_time_ms: Maximum time for commit operation
        
        Returns:
        Return value from the coroutine
        """
    
    # Session Management
    async def end_session(self) -> None:
        """End this session."""
    
    def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
        """Advance the cluster time for this session."""
    
    def advance_operation_time(self, operation_time: Any) -> None:
        """Advance the operation time for this session."""
    
    # Context Manager Protocol
    async def __aenter__(self) -> AsyncIOMotorClientSession:
        """Async context manager entry."""
    
    async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
        """Async context manager exit."""

Usage Examples

Basic AsyncIO Operations

import asyncio
import motor.motor_asyncio

async def example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    collection = db.test_collection
    
    # Insert operations
    result = await collection.insert_one({"name": "Alice", "age": 30})
    print(f"Inserted ID: {result.inserted_id}")
    
    results = await collection.insert_many([
        {"name": "Bob", "age": 25},
        {"name": "Charlie", "age": 35}
    ])
    print(f"Inserted IDs: {results.inserted_ids}")
    
    # Find operations
    document = await collection.find_one({"name": "Alice"})
    print(f"Found: {document}")
    
    # Update operations
    result = await collection.update_one(
        {"name": "Alice"},
        {"$set": {"age": 31}}
    )
    print(f"Modified {result.modified_count} document(s)")
    
    # Delete operations
    result = await collection.delete_one({"name": "Alice"})
    print(f"Deleted {result.deleted_count} document(s)")
    
    client.close()

asyncio.run(example())

Cursor Iteration

async def cursor_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.test_collection
    
    # Async iteration
    async for document in collection.find({"age": {"$gte": 18}}):
        print(document)
    
    # To list with limit
    cursor = collection.find().limit(10)
    documents = await cursor.to_list(length=10)
    print(f"Found {len(documents)} documents")
    
    client.close()

Error Handling

import pymongo.errors

async def error_handling_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    collection = client.test_database.test_collection
    
    try:
        await collection.insert_one({"_id": 1, "name": "test"})
        await collection.insert_one({"_id": 1, "name": "duplicate"})  # Will fail
    except pymongo.errors.DuplicateKeyError as e:
        print(f"Duplicate key error: {e}")
    except pymongo.errors.ConnectionFailure as e:
        print(f"Connection failed: {e}")
    finally:
        client.close()

Types

from typing import Any, Optional, Union, Dict, List, Tuple, Set
from datetime import datetime

AsyncIOMotorClientSession = Any  # Actual session type from motor
AsyncIOMotorCommandCursor = Any  # Command cursor type
AsyncIOMotorCursor = Any  # Query cursor type  
AsyncIOMotorChangeStream = Any  # Change stream type

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json