or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

asyncio-operations.mdchange-streams.mdclient-encryption.mdcursor-operations.mdgridfs-operations.mdindex.mdtornado-operations.mdweb-integration.md
tile.json

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/motor@3.7.x

To install, run

npx @tessl/cli install tessl/pypi-motor@3.7.0

index.mddocs/

Motor

A full-featured, non-blocking MongoDB driver for Python asyncio and Tornado applications. Motor provides a coroutine-based API for non-blocking access to MongoDB, enabling high-throughput database operations in asynchronous Python applications.

Package Information

  • Package Name: motor
  • Language: Python
  • Installation: pip install motor
  • License: Apache License 2.0
  • Documentation: https://motor.readthedocs.io/en/stable/

Core Imports

For asyncio applications:

import motor.motor_asyncio as motor

For Tornado applications:

import motor.motor_tornado as motor

Specific imports:

from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from motor.motor_tornado import MotorClient, MotorDatabase, MotorCollection

Basic Usage

AsyncIO Example

import asyncio
import motor.motor_asyncio

async def main():
    # Connect to MongoDB
    client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
    
    # Get database and collection
    db = client.my_database
    collection = db.my_collection
    
    # Insert a document
    result = await collection.insert_one({'name': 'Alice', 'age': 30})
    print(f"Inserted document with ID: {result.inserted_id}")
    
    # Find documents
    async for document in collection.find({'age': {'$gte': 18}}):
        print(document)
    
    # Find one document
    user = await collection.find_one({'name': 'Alice'})
    print(f"Found user: {user}")
    
    # Update a document
    await collection.update_one(
        {'name': 'Alice'},
        {'$set': {'age': 31}}
    )
    
    # Delete a document
    await collection.delete_one({'name': 'Alice'})
    
    # Close the connection
    client.close()

asyncio.run(main())

Tornado Example

import tornado.ioloop
import motor.motor_tornado

def main():
    client = motor.motor_tornado.MotorClient('mongodb://localhost:27017')
    db = client.my_database
    collection = db.my_collection
    
    # Insert and handle with callback
    future = collection.insert_one({'name': 'Bob', 'age': 25})
    tornado.ioloop.IOLoop.current().run_sync(lambda: future)
    
    client.close()

if __name__ == '__main__':
    main()

Architecture

Motor uses a sophisticated metaprogramming system to create framework-specific classes from framework-agnostic base classes:

  • Framework-Agnostic Core: Base classes in motor.core define all MongoDB operations
  • Framework-Specific Wrappers: Classes in motor_asyncio and motor_tornado adapt the core for each framework
  • Metaprogramming Decorators: Transform synchronous PyMongo operations into asynchronous Motor operations
  • Dual API Support: Complete API compatibility with both asyncio and Tornado frameworks

This design enables Motor to provide identical functionality for both asyncio and Tornado while maintaining a single codebase for core MongoDB operations.

Capabilities

AsyncIO Client Operations

Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. Supports async/await syntax and native asyncio.Future objects.

class AsyncIOMotorClient:
    def __init__(self, host='localhost', port=27017, **kwargs): ...
    def get_database(self, name=None, **kwargs) -> AsyncIOMotorDatabase: ...
    def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase: ...
    async def list_databases(self, session=None, **kwargs): ...
    async def list_database_names(self, session=None, **kwargs) -> list[str]: ...
    async def server_info(self) -> dict: ...
    async def start_session(self, **kwargs): ...
    def close(self) -> None: ...

class AsyncIOMotorDatabase:
    def get_collection(self, name, **kwargs) -> AsyncIOMotorCollection: ...
    async def create_collection(self, name, **kwargs) -> AsyncIOMotorCollection: ...
    async def drop_collection(self, name_or_collection, **kwargs): ...
    async def list_collection_names(self, **kwargs) -> list[str]: ...
    async def command(self, command, **kwargs): ...
    
class AsyncIOMotorCollection:
    async def insert_one(self, document, **kwargs): ...
    async def insert_many(self, documents, **kwargs): ...
    async def find_one(self, filter=None, **kwargs): ...
    def find(self, filter=None, **kwargs) -> AsyncIOMotorCursor: ...
    async def update_one(self, filter, update, **kwargs): ...
    async def update_many(self, filter, update, **kwargs): ...
    async def delete_one(self, filter, **kwargs): ...
    async def delete_many(self, filter, **kwargs): ...
    async def count_documents(self, filter, **kwargs) -> int: ...

class AsyncIOMotorClientSession:
    async def start_transaction(self, **kwargs): ...
    async def commit_transaction(self): ...
    async def abort_transaction(self): ...
    async def with_transaction(self, coro, **kwargs): ...
    async def end_session(self): ...
    async def __aenter__(self): ...
    async def __aexit__(self, exc_type, exc_val, exc_tb): ...

AsyncIO Operations

Tornado Client Operations

Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. Provides callback-based and Future-based async operations.

class MotorClient:
    def __init__(self, host='localhost', port=27017, **kwargs): ...
    def get_database(self, name=None, **kwargs) -> MotorDatabase: ...
    def get_default_database(self, **kwargs) -> MotorDatabase: ...
    def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future: ...
    def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future: ...
    def server_info(self) -> tornado.concurrent.Future: ...
    def start_session(self, **kwargs) -> tornado.concurrent.Future: ...
    def close(self) -> None: ...

class MotorDatabase:
    def get_collection(self, name, **kwargs) -> MotorCollection: ...
    def create_collection(self, name, **kwargs) -> tornado.concurrent.Future: ...
    def drop_collection(self, name_or_collection, **kwargs) -> tornado.concurrent.Future: ...
    def list_collection_names(self, **kwargs) -> tornado.concurrent.Future: ...
    def command(self, command, **kwargs) -> tornado.concurrent.Future: ...
    
class MotorCollection:
    def insert_one(self, document, **kwargs) -> tornado.concurrent.Future: ...
    def insert_many(self, documents, **kwargs) -> tornado.concurrent.Future: ...
    def find_one(self, filter=None, **kwargs) -> tornado.concurrent.Future: ...
    def find(self, filter=None, **kwargs) -> MotorCursor: ...
    def update_one(self, filter, update, **kwargs) -> tornado.concurrent.Future: ...
    def update_many(self, filter, update, **kwargs) -> tornado.concurrent.Future: ...
    def delete_one(self, filter, **kwargs) -> tornado.concurrent.Future: ...
    def delete_many(self, filter, **kwargs) -> tornado.concurrent.Future: ...
    def count_documents(self, filter, **kwargs) -> tornado.concurrent.Future: ...

class MotorClientSession:
    def start_transaction(self, **kwargs): ...
    def commit_transaction(self) -> tornado.concurrent.Future: ...
    def abort_transaction(self) -> tornado.concurrent.Future: ...
    def with_transaction(self, coro, **kwargs) -> tornado.concurrent.Future: ...
    def end_session(self) -> tornado.concurrent.Future: ...

Tornado Operations

GridFS File Storage

GridFS support for storing and retrieving large files in MongoDB. Provides streaming file operations for both asyncio and Tornado frameworks.

class AsyncIOMotorGridFSBucket:
    def __init__(self, database, bucket_name='fs', **kwargs): ...
    async def upload_from_stream(self, filename, source, **kwargs): ...
    async def upload_from_stream_with_id(self, file_id, filename, source, **kwargs): ...
    async def open_download_stream(self, file_id, **kwargs) -> AsyncIOMotorGridOut: ...
    async def open_download_stream_by_name(self, filename, **kwargs) -> AsyncIOMotorGridOut: ...
    def open_upload_stream(self, filename, **kwargs) -> AsyncIOMotorGridIn: ...
    def open_upload_stream_with_id(self, file_id, filename, **kwargs) -> AsyncIOMotorGridIn: ...
    async def delete(self, file_id, **kwargs): ...
    async def rename(self, file_id, new_filename, **kwargs): ...

class AsyncIOMotorGridOut:
    # Properties
    _id: Any
    filename: str
    length: int
    content_type: str
    upload_date: datetime.datetime
    metadata: dict
    
    # Methods
    async def open(self): ...
    async def read(self, size=-1) -> bytes: ...
    async def readline(self, size=-1) -> bytes: ...
    async def readchunk(self) -> bytes: ...
    def seek(self, pos, whence=0) -> int: ...
    def tell(self) -> int: ...
    def close(self) -> None: ...

class AsyncIOMotorGridIn:
    # Properties
    _id: Any
    filename: str
    content_type: str
    chunk_size: int
    closed: bool
    
    # Methods
    async def write(self, data) -> None: ...
    async def writelines(self, lines) -> None: ...
    async def close(self) -> None: ...
    async def abort(self) -> None: ...

GridFS Operations

Cursor Operations

Cursor functionality for iterating over query results, command results, and change streams. Supports both async iteration and traditional cursor methods.

class AsyncIOMotorCursor:
    def limit(self, limit) -> AsyncIOMotorCursor: ...
    def skip(self, skip) -> AsyncIOMotorCursor: ...
    def sort(self, key_or_list, direction=None) -> AsyncIOMotorCursor: ...
    def batch_size(self, batch_size) -> AsyncIOMotorCursor: ...
    async def to_list(self, length=None) -> list: ...
    def __aiter__(self): ...
    async def __anext__(self): ...

class AsyncIOMotorCommandCursor:
    def batch_size(self, batch_size) -> AsyncIOMotorCommandCursor: ...
    async def to_list(self, length=None) -> list: ...
    def __aiter__(self): ...
    async def __anext__(self): ...

class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):
    def batch_size(self, batch_size) -> AsyncIOMotorLatentCommandCursor: ...
    async def to_list(self, length=None) -> list: ...
    def __aiter__(self): ...
    async def __anext__(self): ...

class AsyncIOMotorChangeStream:
    async def next(self): ...
    def __aiter__(self): ...
    async def __anext__(self): ...
    async def close(self): ...
    def resume_token(self): ...

Cursor Operations

Change Streams

Real-time change monitoring for watching database, collection, or document changes. Enables reactive applications that respond to data modifications.

def watch(pipeline=None, **kwargs) -> AsyncIOMotorChangeStream: ...

class AsyncIOMotorChangeStream:
    async def next(self): ...
    def __aiter__(self): ...
    async def __anext__(self): ...
    async def close(self): ...
    @property
    def resume_token(self): ...

Change Streams

Web Integration

HTTP handlers for serving GridFS files through web frameworks. Includes optimized handlers for both Tornado and aiohttp with support for HTTP caching, range requests, and content negotiation.

# Tornado Integration
class GridFSHandler(tornado.web.RequestHandler):
    def initialize(self, database, **kwargs): ...
    async def get(self, filename, include_body=True): ...
    async def head(self, filename): ...

# aiohttp Integration  
class AIOHTTPGridFS:
    def __init__(self, database, root_collection='fs', **kwargs): ...
    async def __call__(self, request) -> aiohttp.web.Response: ...

def get_gridfs_file(bucket, filename, request): ...
def get_cache_time(filename, modified, mime_type) -> int: ...
def set_extra_headers(response, gridout) -> None: ...

Web Integration

Client-Side Field Level Encryption

Encryption and decryption of fields for client-side field level encryption (CSFLE). Provides key management, field encryption/decryption, and secure data operations.

class AsyncIOMotorClientEncryption:
    def __init__(
        self,
        kms_providers: dict,
        key_vault_namespace: str,
        key_vault_client: AsyncIOMotorClient,
        codec_options: CodecOptions,
        **kwargs
    ): ...
    async def create_data_key(
        self,
        kms_provider: str,
        master_key: Optional[dict] = None,
        key_alt_names: Optional[list] = None,
        key_material: Optional[bytes] = None
    ) -> Binary: ...
    async def encrypt(
        self,
        value: Any,
        algorithm: str,
        key_id: Optional[Binary] = None,
        key_alt_name: Optional[str] = None,
        **kwargs
    ) -> Binary: ...
    async def decrypt(self, value: Binary) -> Any: ...
    async def close(self) -> None: ...

Client Encryption

Common Patterns

Connection Management

# AsyncIO with connection pooling
client = motor.motor_asyncio.AsyncIOMotorClient(
    'mongodb://localhost:27017',
    maxPoolSize=10,
    minPoolSize=5
)

# Always close connections
try:
    # Your database operations
    pass
finally:
    client.close()

Error Handling

import pymongo.errors

try:
    await collection.insert_one(document)
except pymongo.errors.DuplicateKeyError:
    print("Document already exists")
except pymongo.errors.ConnectionFailure:
    print("Failed to connect to MongoDB")

Transaction Support

async def run_in_transaction():
    async with await client.start_session() as session:
        async with session.start_transaction():
            await collection1.insert_one(doc1, session=session)
            await collection2.update_one(filter, update, session=session)

Types

from typing import Any, Optional, Union, Dict, List
from datetime import datetime
import tornado.concurrent
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from bson import Binary, CodecOptions

# Common type aliases used throughout Motor
Document = Dict[str, Any]
Filter = Dict[str, Any]
Update = Dict[str, Any]
Pipeline = List[Dict[str, Any]]

class InsertOneResult:
    inserted_id: Any
    acknowledged: bool

class InsertManyResult:
    inserted_ids: List[Any]
    acknowledged: bool

class UpdateResult:
    matched_count: int
    modified_count: int
    upserted_id: Optional[Any]
    acknowledged: bool

class DeleteResult:
    deleted_count: int
    acknowledged: bool