Non-blocking MongoDB driver for Python asyncio and Tornado applications
—
Core MongoDB client, database, and collection operations optimized for Tornado's IOLoop and Future objects. These classes integrate seamlessly with Tornado applications and provide callback-based asynchronous operations.
The main entry point for Tornado-based MongoDB operations, providing connection management and database access with Tornado Future objects.
class MotorClient:
def __init__(
self,
host: Union[str, List[str]] = 'localhost',
port: int = 27017,
document_class: type = dict,
tz_aware: bool = False,
connect: bool = True,
io_loop=None,
**kwargs
):
"""
Create a new MotorClient connection to MongoDB.
Parameters:
- host: MongoDB host(s) to connect to
- port: Port number for MongoDB connection
- document_class: Default class for documents returned from queries
- tz_aware: Whether datetime objects should be timezone-aware
- connect: Whether to connect immediately or lazily
- io_loop: Tornado IOLoop instance (optional, uses current if None)
- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
"""
def get_database(self, name: Optional[str] = None, **kwargs) -> MotorDatabase:
"""Get a database instance."""
def get_default_database(self, **kwargs) -> MotorDatabase:
"""Get the default database specified in connection URI."""
def list_databases(self, session=None, **kwargs) -> tornado.concurrent.Future:
"""List all databases on the MongoDB server."""
def list_database_names(self, session=None, **kwargs) -> tornado.concurrent.Future:
"""Get names of all databases on the server."""
def server_info(self) -> tornado.concurrent.Future:
"""Get information about the MongoDB server."""
def start_session(self, **kwargs) -> tornado.concurrent.Future:
"""Start a logical session for use with transactions."""
def drop_database(self, name_or_database: Union[str, MotorDatabase], session=None) -> tornado.concurrent.Future:
"""Drop a database."""
def close(self) -> None:
"""Close all connections to MongoDB."""
def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> MotorChangeStream:
"""Watch for changes across all collections in all databases."""
# Read-only properties
@property
def address(self) -> Optional[Tuple[str, int]]:
"""Current connection address."""
@property
def primary(self) -> Optional[Tuple[str, int]]:
"""Primary server address in replica set."""
@property
def secondaries(self) -> Set[Tuple[str, int]]:
"""Secondary server addresses in replica set."""
@property
def is_primary(self) -> bool:
"""Whether connected to a primary server."""
@property
def is_mongos(self) -> bool:
"""Whether connected to a mongos router."""Represents a MongoDB database with Tornado Future-based operations.
class MotorDatabase:
@property
def name(self) -> str:
"""Database name."""
@property
def client(self) -> MotorClient:
"""The client that owns this database."""
def get_collection(self, name: str, **kwargs) -> MotorCollection:
"""Get a collection in this database."""
def __getitem__(self, name: str) -> MotorCollection:
"""Get a collection using dictionary-style access."""
def __getattr__(self, name: str) -> MotorCollection:
"""Get a collection using attribute-style access."""
def create_collection(
self,
name: str,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Create a new collection in this database."""
def drop_collection(
self,
name_or_collection: Union[str, MotorCollection],
session=None
) -> tornado.concurrent.Future:
"""Drop a collection."""
def list_collection_names(
self,
session=None,
filter: Optional[Dict[str, Any]] = None,
**kwargs
) -> tornado.concurrent.Future:
"""Get names of all collections in this database."""
def list_collections(
self,
session=None,
filter: Optional[Dict[str, Any]] = None,
**kwargs
) -> tornado.concurrent.Future:
"""List collections with metadata."""
def command(
self,
command: Union[str, Dict[str, Any]],
value: Any = 1,
check: bool = True,
allowable_errors: Optional[List[str]] = None,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Execute a database command."""
def aggregate(
self,
pipeline: List[Dict[str, Any]],
session=None,
**kwargs
) -> MotorCommandCursor:
"""Execute an aggregation pipeline on the database."""
def watch(
self,
pipeline: Optional[List[Dict[str, Any]]] = None,
session=None,
**kwargs
) -> MotorChangeStream:
"""Watch for changes on all collections in this database."""Represents a MongoDB collection with Tornado Future-based document operations.
class MotorCollection:
@property
def name(self) -> str:
"""Collection name."""
@property
def full_name(self) -> str:
"""Full collection name (database.collection)."""
@property
def database(self) -> MotorDatabase:
"""The database that owns this collection."""
# Insert Operations
def insert_one(
self,
document: Dict[str, Any],
bypass_document_validation: bool = False,
session=None
) -> tornado.concurrent.Future:
"""Insert a single document."""
def insert_many(
self,
documents: List[Dict[str, Any]],
ordered: bool = True,
bypass_document_validation: bool = False,
session=None
) -> tornado.concurrent.Future:
"""Insert multiple documents."""
# Find Operations
def find_one(
self,
filter: Optional[Dict[str, Any]] = None,
*args,
projection: Optional[Dict[str, Any]] = None,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Find a single document."""
def find(
self,
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
skip: int = 0,
limit: int = 0,
no_cursor_timeout: bool = False,
cursor_type=None,
sort: Optional[List[Tuple[str, int]]] = None,
allow_partial_results: bool = False,
batch_size: int = 0,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
max_time_ms: Optional[int] = None,
session=None,
**kwargs
) -> MotorCursor:
"""Find multiple documents, returns a cursor."""
def find_one_and_delete(
self,
filter: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Find and delete a single document."""
def find_one_and_replace(
self,
filter: Dict[str, Any],
replacement: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
upsert: bool = False,
return_document: bool = False,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Find and replace a single document."""
def find_one_and_update(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
upsert: bool = False,
return_document: bool = False,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Find and update a single document."""
# Update Operations
def update_one(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
upsert: bool = False,
bypass_document_validation: bool = False,
collation=None,
array_filters: Optional[List[Dict[str, Any]]] = None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> tornado.concurrent.Future:
"""Update a single document."""
def update_many(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
upsert: bool = False,
array_filters: Optional[List[Dict[str, Any]]] = None,
bypass_document_validation: bool = False,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> tornado.concurrent.Future:
"""Update multiple documents."""
def replace_one(
self,
filter: Dict[str, Any],
replacement: Dict[str, Any],
upsert: bool = False,
bypass_document_validation: bool = False,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> tornado.concurrent.Future:
"""Replace a single document."""
# Delete Operations
def delete_one(
self,
filter: Dict[str, Any],
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> tornado.concurrent.Future:
"""Delete a single document."""
def delete_many(
self,
filter: Dict[str, Any],
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> tornado.concurrent.Future:
"""Delete multiple documents."""
# Count Operations
def count_documents(
self,
filter: Dict[str, Any],
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Count documents matching filter."""
def estimated_document_count(self, **kwargs) -> tornado.concurrent.Future:
"""Estimate total document count."""
# Index Operations
def create_index(
self,
keys: Union[str, List[Tuple[str, int]]],
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Create a single index."""
def create_indexes(
self,
indexes: List[Dict[str, Any]],
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Create multiple indexes."""
def drop_index(
self,
index: Union[str, List[Tuple[str, int]]],
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Drop a single index."""
def list_indexes(self, session=None) -> MotorCommandCursor:
"""List all indexes on the collection."""
# Aggregation Operations
def aggregate(
self,
pipeline: List[Dict[str, Any]],
session=None,
**kwargs
) -> MotorCommandCursor:
"""Execute an aggregation pipeline."""
def distinct(
self,
key: str,
filter: Optional[Dict[str, Any]] = None,
session=None,
**kwargs
) -> tornado.concurrent.Future:
"""Get distinct values for a field."""
# Bulk Operations
def bulk_write(
self,
requests: List[Any],
ordered: bool = True,
bypass_document_validation: bool = False,
session=None
) -> tornado.concurrent.Future:
"""Execute bulk write operations."""
# Change Streams
def watch(
self,
pipeline: Optional[List[Dict[str, Any]]] = None,
full_document: Optional[str] = None,
resume_after: Optional[Dict[str, Any]] = None,
max_await_time_ms: Optional[int] = None,
batch_size: Optional[int] = None,
collation=None,
start_at_operation_time=None,
session=None,
start_after: Optional[Dict[str, Any]] = None,
**kwargs
) -> MotorChangeStream:
"""Watch for changes on the collection."""
# Collection Management
def drop(self, session=None) -> tornado.concurrent.Future:
"""Drop the collection."""
def rename(self, new_name: str, session=None, **kwargs) -> tornado.concurrent.Future:
"""Rename the collection."""Client session for transaction support and causally consistent reads in Tornado applications.
class MotorClientSession:
"""
A session for ordering sequential operations and transactions.
Created via MotorClient.start_session(), not directly instantiated.
"""
# Properties
@property
def client(self) -> MotorClient:
"""The client this session was created from."""
@property
def cluster_time(self) -> Optional[Dict[str, Any]]:
"""The cluster time returned by the last operation."""
@property
def has_ended(self) -> bool:
"""Whether this session has ended."""
@property
def in_transaction(self) -> bool:
"""Whether this session is in an active transaction."""
@property
def operation_time(self) -> Optional[Any]:
"""The operation time returned by the last operation."""
@property
def options(self) -> Dict[str, Any]:
"""The options used to create this session."""
@property
def session_id(self) -> Dict[str, Any]:
"""A BSON document identifying this session."""
# Transaction Methods
def start_transaction(
self,
read_concern: Optional[Any] = None,
write_concern: Optional[Any] = None,
read_preference: Optional[Any] = None,
max_commit_time_ms: Optional[int] = None
) -> Any:
"""
Start a multi-statement transaction.
Returns a context manager for the transaction.
Use with context manager syntax.
Parameters:
- read_concern: Read concern for the transaction
- write_concern: Write concern for the transaction
- read_preference: Read preference for the transaction
- max_commit_time_ms: Maximum time for commit operation
Returns:
Transaction context manager
"""
def commit_transaction(self) -> tornado.concurrent.Future:
"""Commit the current transaction."""
def abort_transaction(self) -> tornado.concurrent.Future:
"""Abort the current transaction."""
def with_transaction(
self,
coro: Callable,
read_concern: Optional[Any] = None,
write_concern: Optional[Any] = None,
read_preference: Optional[Any] = None,
max_commit_time_ms: Optional[int] = None
) -> tornado.concurrent.Future:
"""
Execute a coroutine within a transaction.
Automatically handles transaction retry logic for transient errors.
Will retry the entire transaction for up to 120 seconds.
Parameters:
- coro: Function that takes this session as first argument
- read_concern: Read concern for the transaction
- write_concern: Write concern for the transaction
- read_preference: Read preference for the transaction
- max_commit_time_ms: Maximum time for commit operation
Returns:
Tornado Future with return value from the coroutine
"""
# Session Management
def end_session(self) -> tornado.concurrent.Future:
"""End this session."""
def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
"""Advance the cluster time for this session."""
def advance_operation_time(self, operation_time: Any) -> None:
"""Advance the operation time for this session."""import tornado.ioloop
import motor.motor_tornado
async def example():
client = motor.motor_tornado.MotorClient()
db = client.test_database
collection = db.test_collection
# Insert operations
result = await collection.insert_one({"name": "Alice", "age": 30})
print(f"Inserted ID: {result.inserted_id}")
results = await collection.insert_many([
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35}
])
print(f"Inserted IDs: {results.inserted_ids}")
# Find operations
document = await collection.find_one({"name": "Alice"})
print(f"Found: {document}")
# Update operations
result = await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
print(f"Modified {result.modified_count} document(s)")
# Delete operations
result = await collection.delete_one({"name": "Alice"})
print(f"Deleted {result.deleted_count} document(s)")
client.close()
# Run with Tornado IOLoop
tornado.ioloop.IOLoop.current().run_sync(example)import tornado.ioloop
import motor.motor_tornado
def handle_result(result, error):
if error:
print(f"Error: {error}")
else:
print(f"Result: {result}")
tornado.ioloop.IOLoop.current().stop()
def callback_example():
client = motor.motor_tornado.MotorClient()
collection = client.test_database.test_collection
# Using callbacks (deprecated style)
future = collection.find_one({"name": "Alice"})
tornado.ioloop.IOLoop.current().add_future(
future,
lambda f: handle_result(f.result(), f.exception())
)
tornado.ioloop.IOLoop.current().run_sync(callback_example)import tornado.gen
import tornado.ioloop
import motor.motor_tornado
@tornado.gen.coroutine
def gen_example():
client = motor.motor_tornado.MotorClient()
collection = client.test_database.test_collection
# Generator-based coroutines (legacy)
try:
result = yield collection.insert_one({"name": "test"})
print(f"Inserted: {result.inserted_id}")
document = yield collection.find_one({"name": "test"})
print(f"Found: {document}")
finally:
client.close()
tornado.ioloop.IOLoop.current().run_sync(gen_example)import tornado.web
import tornado.ioloop
import motor.motor_tornado
class MainHandler(tornado.web.RequestHandler):
async def get(self):
db = self.application.settings['db']
collection = db.users
# Find users
users = []
async for user in collection.find().limit(10):
users.append(user)
self.write({"users": users})
def make_app():
client = motor.motor_tornado.MotorClient()
return tornado.web.Application([
(r"/", MainHandler),
], db=client.test_database)
if __name__ == "__main__":
app = make_app()
app.listen(8888)
tornado.ioloop.IOLoop.current().start()from typing import Any, Optional, Union, Dict, List, Tuple, Set
import tornado.concurrent
MotorClientSession = Any # Actual session type from motor
MotorCommandCursor = Any # Command cursor type
MotorCursor = Any # Query cursor type
MotorChangeStream = Any # Change stream typeInstall with Tessl CLI
npx tessl i tessl/pypi-motor