Non-blocking MongoDB driver for Python asyncio and Tornado applications
npx @tessl/cli install tessl/pypi-motor@3.7.0A full-featured, non-blocking MongoDB driver for Python asyncio and Tornado applications. Motor provides a coroutine-based API for non-blocking access to MongoDB, enabling high-throughput database operations in asynchronous Python applications.
pip install motorFor asyncio applications:
import motor.motor_asyncio as motorFor Tornado applications:
import motor.motor_tornado as motorSpecific imports:
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase, AsyncIOMotorCollection
from motor.motor_tornado import MotorClient, MotorDatabase, MotorCollectionimport asyncio
import motor.motor_asyncio
async def main():
# Connect to MongoDB
client = motor.motor_asyncio.AsyncIOMotorClient('mongodb://localhost:27017')
# Get database and collection
db = client.my_database
collection = db.my_collection
# Insert a document
result = await collection.insert_one({'name': 'Alice', 'age': 30})
print(f"Inserted document with ID: {result.inserted_id}")
# Find documents
async for document in collection.find({'age': {'$gte': 18}}):
print(document)
# Find one document
user = await collection.find_one({'name': 'Alice'})
print(f"Found user: {user}")
# Update a document
await collection.update_one(
{'name': 'Alice'},
{'$set': {'age': 31}}
)
# Delete a document
await collection.delete_one({'name': 'Alice'})
# Close the connection
client.close()
asyncio.run(main())import tornado.ioloop
import motor.motor_tornado
def main():
client = motor.motor_tornado.MotorClient('mongodb://localhost:27017')
db = client.my_database
collection = db.my_collection
# Insert and handle with callback
future = collection.insert_one({'name': 'Bob', 'age': 25})
tornado.ioloop.IOLoop.current().run_sync(lambda: future)
client.close()
if __name__ == '__main__':
main()Motor uses a sophisticated metaprogramming system to create framework-specific classes from framework-agnostic base classes:
motor.core define all MongoDB operationsmotor_asyncio and motor_tornado adapt the core for each frameworkThis design enables Motor to provide identical functionality for both asyncio and Tornado while maintaining a single codebase for core MongoDB operations.
Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. Supports async/await syntax and native asyncio.Future objects.
class AsyncIOMotorClient:
def __init__(self, host='localhost', port=27017, **kwargs): ...
def get_database(self, name=None, **kwargs) -> AsyncIOMotorDatabase: ...
def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase: ...
async def list_databases(self, session=None, **kwargs): ...
async def list_database_names(self, session=None, **kwargs) -> list[str]: ...
async def server_info(self) -> dict: ...
async def start_session(self, **kwargs): ...
def close(self) -> None: ...
class AsyncIOMotorDatabase:
def get_collection(self, name, **kwargs) -> AsyncIOMotorCollection: ...
async def create_collection(self, name, **kwargs) -> AsyncIOMotorCollection: ...
async def drop_collection(self, name_or_collection, **kwargs): ...
async def list_collection_names(self, **kwargs) -> list[str]: ...
async def command(self, command, **kwargs): ...
class AsyncIOMotorCollection:
async def insert_one(self, document, **kwargs): ...
async def insert_many(self, documents, **kwargs): ...
async def find_one(self, filter=None, **kwargs): ...
def find(self, filter=None, **kwargs) -> AsyncIOMotorCursor: ...
async def update_one(self, filter, update, **kwargs): ...
async def update_many(self, filter, update, **kwargs): ...
async def delete_one(self, filter, **kwargs): ...
async def delete_many(self, filter, **kwargs): ...
async def count_documents(self, filter, **kwargs) -> int: ...
class AsyncIOMotorClientSession:
async def start_transaction(self, **kwargs): ...
async def commit_transaction(self): ...
async def abort_transaction(self): ...
async def with_transaction(self, coro, **kwargs): ...
async def end_session(self): ...
async def __aenter__(self): ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. Provides callback-based and Future-based async operations.
class MotorClient:
def __init__(self, host='localhost', port=27017, **kwargs): ...
def get_database(self, name=None, **kwargs) -> MotorDatabase: ...
def get_default_database(self, **kwargs) -> MotorDatabase: ...
def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future: ...
def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future: ...
def server_info(self) -> tornado.concurrent.Future: ...
def start_session(self, **kwargs) -> tornado.concurrent.Future: ...
def close(self) -> None: ...
class MotorDatabase:
def get_collection(self, name, **kwargs) -> MotorCollection: ...
def create_collection(self, name, **kwargs) -> tornado.concurrent.Future: ...
def drop_collection(self, name_or_collection, **kwargs) -> tornado.concurrent.Future: ...
def list_collection_names(self, **kwargs) -> tornado.concurrent.Future: ...
def command(self, command, **kwargs) -> tornado.concurrent.Future: ...
class MotorCollection:
def insert_one(self, document, **kwargs) -> tornado.concurrent.Future: ...
def insert_many(self, documents, **kwargs) -> tornado.concurrent.Future: ...
def find_one(self, filter=None, **kwargs) -> tornado.concurrent.Future: ...
def find(self, filter=None, **kwargs) -> MotorCursor: ...
def update_one(self, filter, update, **kwargs) -> tornado.concurrent.Future: ...
def update_many(self, filter, update, **kwargs) -> tornado.concurrent.Future: ...
def delete_one(self, filter, **kwargs) -> tornado.concurrent.Future: ...
def delete_many(self, filter, **kwargs) -> tornado.concurrent.Future: ...
def count_documents(self, filter, **kwargs) -> tornado.concurrent.Future: ...
class MotorClientSession:
def start_transaction(self, **kwargs): ...
def commit_transaction(self) -> tornado.concurrent.Future: ...
def abort_transaction(self) -> tornado.concurrent.Future: ...
def with_transaction(self, coro, **kwargs) -> tornado.concurrent.Future: ...
def end_session(self) -> tornado.concurrent.Future: ...GridFS support for storing and retrieving large files in MongoDB. Provides streaming file operations for both asyncio and Tornado frameworks.
class AsyncIOMotorGridFSBucket:
def __init__(self, database, bucket_name='fs', **kwargs): ...
async def upload_from_stream(self, filename, source, **kwargs): ...
async def upload_from_stream_with_id(self, file_id, filename, source, **kwargs): ...
async def open_download_stream(self, file_id, **kwargs) -> AsyncIOMotorGridOut: ...
async def open_download_stream_by_name(self, filename, **kwargs) -> AsyncIOMotorGridOut: ...
def open_upload_stream(self, filename, **kwargs) -> AsyncIOMotorGridIn: ...
def open_upload_stream_with_id(self, file_id, filename, **kwargs) -> AsyncIOMotorGridIn: ...
async def delete(self, file_id, **kwargs): ...
async def rename(self, file_id, new_filename, **kwargs): ...
class AsyncIOMotorGridOut:
# Properties
_id: Any
filename: str
length: int
content_type: str
upload_date: datetime.datetime
metadata: dict
# Methods
async def open(self): ...
async def read(self, size=-1) -> bytes: ...
async def readline(self, size=-1) -> bytes: ...
async def readchunk(self) -> bytes: ...
def seek(self, pos, whence=0) -> int: ...
def tell(self) -> int: ...
def close(self) -> None: ...
class AsyncIOMotorGridIn:
# Properties
_id: Any
filename: str
content_type: str
chunk_size: int
closed: bool
# Methods
async def write(self, data) -> None: ...
async def writelines(self, lines) -> None: ...
async def close(self) -> None: ...
async def abort(self) -> None: ...Cursor functionality for iterating over query results, command results, and change streams. Supports both async iteration and traditional cursor methods.
class AsyncIOMotorCursor:
def limit(self, limit) -> AsyncIOMotorCursor: ...
def skip(self, skip) -> AsyncIOMotorCursor: ...
def sort(self, key_or_list, direction=None) -> AsyncIOMotorCursor: ...
def batch_size(self, batch_size) -> AsyncIOMotorCursor: ...
async def to_list(self, length=None) -> list: ...
def __aiter__(self): ...
async def __anext__(self): ...
class AsyncIOMotorCommandCursor:
def batch_size(self, batch_size) -> AsyncIOMotorCommandCursor: ...
async def to_list(self, length=None) -> list: ...
def __aiter__(self): ...
async def __anext__(self): ...
class AsyncIOMotorLatentCommandCursor(AsyncIOMotorCommandCursor):
def batch_size(self, batch_size) -> AsyncIOMotorLatentCommandCursor: ...
async def to_list(self, length=None) -> list: ...
def __aiter__(self): ...
async def __anext__(self): ...
class AsyncIOMotorChangeStream:
async def next(self): ...
def __aiter__(self): ...
async def __anext__(self): ...
async def close(self): ...
def resume_token(self): ...Real-time change monitoring for watching database, collection, or document changes. Enables reactive applications that respond to data modifications.
def watch(pipeline=None, **kwargs) -> AsyncIOMotorChangeStream: ...
class AsyncIOMotorChangeStream:
async def next(self): ...
def __aiter__(self): ...
async def __anext__(self): ...
async def close(self): ...
@property
def resume_token(self): ...HTTP handlers for serving GridFS files through web frameworks. Includes optimized handlers for both Tornado and aiohttp with support for HTTP caching, range requests, and content negotiation.
# Tornado Integration
class GridFSHandler(tornado.web.RequestHandler):
def initialize(self, database, **kwargs): ...
async def get(self, filename, include_body=True): ...
async def head(self, filename): ...
# aiohttp Integration
class AIOHTTPGridFS:
def __init__(self, database, root_collection='fs', **kwargs): ...
async def __call__(self, request) -> aiohttp.web.Response: ...
def get_gridfs_file(bucket, filename, request): ...
def get_cache_time(filename, modified, mime_type) -> int: ...
def set_extra_headers(response, gridout) -> None: ...Encryption and decryption of fields for client-side field level encryption (CSFLE). Provides key management, field encryption/decryption, and secure data operations.
class AsyncIOMotorClientEncryption:
def __init__(
self,
kms_providers: dict,
key_vault_namespace: str,
key_vault_client: AsyncIOMotorClient,
codec_options: CodecOptions,
**kwargs
): ...
async def create_data_key(
self,
kms_provider: str,
master_key: Optional[dict] = None,
key_alt_names: Optional[list] = None,
key_material: Optional[bytes] = None
) -> Binary: ...
async def encrypt(
self,
value: Any,
algorithm: str,
key_id: Optional[Binary] = None,
key_alt_name: Optional[str] = None,
**kwargs
) -> Binary: ...
async def decrypt(self, value: Binary) -> Any: ...
async def close(self) -> None: ...# AsyncIO with connection pooling
client = motor.motor_asyncio.AsyncIOMotorClient(
'mongodb://localhost:27017',
maxPoolSize=10,
minPoolSize=5
)
# Always close connections
try:
# Your database operations
pass
finally:
client.close()import pymongo.errors
try:
await collection.insert_one(document)
except pymongo.errors.DuplicateKeyError:
print("Document already exists")
except pymongo.errors.ConnectionFailure:
print("Failed to connect to MongoDB")async def run_in_transaction():
async with await client.start_session() as session:
async with session.start_transaction():
await collection1.insert_one(doc1, session=session)
await collection2.update_one(filter, update, session=session)from typing import Any, Optional, Union, Dict, List
from datetime import datetime
import tornado.concurrent
from pymongo.results import InsertOneResult, InsertManyResult, UpdateResult, DeleteResult
from bson import Binary, CodecOptions
# Common type aliases used throughout Motor
Document = Dict[str, Any]
Filter = Dict[str, Any]
Update = Dict[str, Any]
Pipeline = List[Dict[str, Any]]
class InsertOneResult:
inserted_id: Any
acknowledged: bool
class InsertManyResult:
inserted_ids: List[Any]
acknowledged: bool
class UpdateResult:
matched_count: int
modified_count: int
upserted_id: Optional[Any]
acknowledged: bool
class DeleteResult:
deleted_count: int
acknowledged: bool