Non-blocking MongoDB driver for Python asyncio and Tornado applications
—
Core MongoDB client, database, and collection operations optimized for Python's asyncio framework. These classes provide native async/await syntax support and integrate seamlessly with asyncio applications.
The main entry point for asyncio-based MongoDB operations, providing connection management and database access.
class AsyncIOMotorClient:
def __init__(
self,
host: Union[str, List[str]] = 'localhost',
port: int = 27017,
document_class: type = dict,
tz_aware: bool = False,
connect: bool = True,
**kwargs
):
"""
Create a new AsyncIOMotorClient connection to MongoDB.
Parameters:
- host: MongoDB host(s) to connect to
- port: Port number for MongoDB connection
- document_class: Default class for documents returned from queries
- tz_aware: Whether datetime objects should be timezone-aware
- connect: Whether to connect immediately or lazily
- **kwargs: Additional connection options (maxPoolSize, ssl, etc.)
"""
def get_database(self, name: Optional[str] = None, **kwargs) -> AsyncIOMotorDatabase:
"""Get a database instance."""
def get_default_database(self, **kwargs) -> AsyncIOMotorDatabase:
"""Get the default database specified in connection URI."""
async def list_databases(self, session=None, **kwargs) -> AsyncIOMotorCommandCursor:
"""List all databases on the MongoDB server."""
async def list_database_names(self, session=None, **kwargs) -> List[str]:
"""Get names of all databases on the server."""
async def server_info(self) -> Dict[str, Any]:
"""Get information about the MongoDB server."""
async def start_session(self, **kwargs) -> AsyncIOMotorClientSession:
"""Start a logical session for use with transactions."""
async def drop_database(self, name_or_database: Union[str, AsyncIOMotorDatabase], session=None) -> None:
"""Drop a database."""
def close(self) -> None:
"""Close all connections to MongoDB."""
def watch(self, pipeline: Optional[List[Dict[str, Any]]] = None, **kwargs) -> AsyncIOMotorChangeStream:
"""Watch for changes across all collections in all databases."""
# Read-only properties
@property
def address(self) -> Optional[Tuple[str, int]]:
"""Current connection address."""
@property
def primary(self) -> Optional[Tuple[str, int]]:
"""Primary server address in replica set."""
@property
def secondaries(self) -> Set[Tuple[str, int]]:
"""Secondary server addresses in replica set."""
@property
def is_primary(self) -> bool:
"""Whether connected to a primary server."""
@property
def is_mongos(self) -> bool:
"""Whether connected to a mongos router."""Represents a MongoDB database, providing access to collections and database-level operations.
class AsyncIOMotorDatabase:
@property
def name(self) -> str:
"""Database name."""
@property
def client(self) -> AsyncIOMotorClient:
"""The client that owns this database."""
def get_collection(self, name: str, **kwargs) -> AsyncIOMotorCollection:
"""Get a collection in this database."""
def __getitem__(self, name: str) -> AsyncIOMotorCollection:
"""Get a collection using dictionary-style access."""
def __getattr__(self, name: str) -> AsyncIOMotorCollection:
"""Get a collection using attribute-style access."""
async def create_collection(
self,
name: str,
codec_options=None,
read_preference=None,
write_concern=None,
read_concern=None,
session=None,
**kwargs
) -> AsyncIOMotorCollection:
"""Create a new collection in this database."""
async def drop_collection(
self,
name_or_collection: Union[str, AsyncIOMotorCollection],
session=None
) -> None:
"""Drop a collection."""
async def list_collection_names(
self,
session=None,
filter: Optional[Dict[str, Any]] = None,
**kwargs
) -> List[str]:
"""Get names of all collections in this database."""
async def list_collections(
self,
session=None,
filter: Optional[Dict[str, Any]] = None,
**kwargs
) -> AsyncIOMotorCommandCursor:
"""List collections with metadata."""
async def command(
self,
command: Union[str, Dict[str, Any]],
value: Any = 1,
check: bool = True,
allowable_errors: Optional[List[str]] = None,
session=None,
**kwargs
) -> Dict[str, Any]:
"""Execute a database command."""
def aggregate(
self,
pipeline: List[Dict[str, Any]],
session=None,
**kwargs
) -> AsyncIOMotorCommandCursor:
"""Execute an aggregation pipeline on the database."""
def watch(
self,
pipeline: Optional[List[Dict[str, Any]]] = None,
session=None,
**kwargs
) -> AsyncIOMotorChangeStream:
"""Watch for changes on all collections in this database."""Represents a MongoDB collection, providing document-level operations like insert, find, update, and delete.
class AsyncIOMotorCollection:
@property
def name(self) -> str:
"""Collection name."""
@property
def full_name(self) -> str:
"""Full collection name (database.collection)."""
@property
def database(self) -> AsyncIOMotorDatabase:
"""The database that owns this collection."""
# Insert Operations
async def insert_one(
self,
document: Dict[str, Any],
bypass_document_validation: bool = False,
session=None
) -> InsertOneResult:
"""Insert a single document."""
async def insert_many(
self,
documents: List[Dict[str, Any]],
ordered: bool = True,
bypass_document_validation: bool = False,
session=None
) -> InsertManyResult:
"""Insert multiple documents."""
# Find Operations
async def find_one(
self,
filter: Optional[Dict[str, Any]] = None,
*args,
projection: Optional[Dict[str, Any]] = None,
session=None,
**kwargs
) -> Optional[Dict[str, Any]]:
"""Find a single document."""
def find(
self,
filter: Optional[Dict[str, Any]] = None,
projection: Optional[Dict[str, Any]] = None,
skip: int = 0,
limit: int = 0,
no_cursor_timeout: bool = False,
cursor_type=None,
sort: Optional[List[Tuple[str, int]]] = None,
allow_partial_results: bool = False,
batch_size: int = 0,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
max_time_ms: Optional[int] = None,
session=None,
**kwargs
) -> AsyncIOMotorCursor:
"""Find multiple documents, returns a cursor."""
async def find_one_and_delete(
self,
filter: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
session=None,
**kwargs
) -> Optional[Dict[str, Any]]:
"""Find and delete a single document."""
async def find_one_and_replace(
self,
filter: Dict[str, Any],
replacement: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
upsert: bool = False,
return_document: bool = False,
session=None,
**kwargs
) -> Optional[Dict[str, Any]]:
"""Find and replace a single document."""
async def find_one_and_update(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
projection: Optional[Dict[str, Any]] = None,
sort: Optional[List[Tuple[str, int]]] = None,
upsert: bool = False,
return_document: bool = False,
session=None,
**kwargs
) -> Optional[Dict[str, Any]]:
"""Find and update a single document."""
# Update Operations
async def update_one(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
upsert: bool = False,
bypass_document_validation: bool = False,
collation=None,
array_filters: Optional[List[Dict[str, Any]]] = None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> UpdateResult:
"""Update a single document."""
async def update_many(
self,
filter: Dict[str, Any],
update: Dict[str, Any],
upsert: bool = False,
array_filters: Optional[List[Dict[str, Any]]] = None,
bypass_document_validation: bool = False,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> UpdateResult:
"""Update multiple documents."""
async def replace_one(
self,
filter: Dict[str, Any],
replacement: Dict[str, Any],
upsert: bool = False,
bypass_document_validation: bool = False,
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> UpdateResult:
"""Replace a single document."""
# Delete Operations
async def delete_one(
self,
filter: Dict[str, Any],
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> DeleteResult:
"""Delete a single document."""
async def delete_many(
self,
filter: Dict[str, Any],
collation=None,
hint: Optional[Union[str, List[Tuple[str, int]]]] = None,
session=None
) -> DeleteResult:
"""Delete multiple documents."""
# Count Operations
async def count_documents(
self,
filter: Dict[str, Any],
session=None,
**kwargs
) -> int:
"""Count documents matching filter."""
async def estimated_document_count(self, **kwargs) -> int:
"""Estimate total document count."""
# Index Operations
async def create_index(
self,
keys: Union[str, List[Tuple[str, int]]],
session=None,
**kwargs
) -> str:
"""Create a single index."""
async def create_indexes(
self,
indexes: List[Dict[str, Any]],
session=None,
**kwargs
) -> List[str]:
"""Create multiple indexes."""
async def drop_index(
self,
index: Union[str, List[Tuple[str, int]]],
session=None,
**kwargs
) -> None:
"""Drop a single index."""
def list_indexes(self, session=None) -> AsyncIOMotorCommandCursor:
"""List all indexes on the collection."""
# Aggregation Operations
def aggregate(
self,
pipeline: List[Dict[str, Any]],
session=None,
**kwargs
) -> AsyncIOMotorCommandCursor:
"""Execute an aggregation pipeline."""
def distinct(
self,
key: str,
filter: Optional[Dict[str, Any]] = None,
session=None,
**kwargs
) -> AsyncIOMotorCommandCursor:
"""Get distinct values for a field."""
# Bulk Operations
def bulk_write(
self,
requests: List[Any],
ordered: bool = True,
bypass_document_validation: bool = False,
session=None
) -> Any:
"""Execute bulk write operations."""
# Change Streams
def watch(
self,
pipeline: Optional[List[Dict[str, Any]]] = None,
full_document: Optional[str] = None,
resume_after: Optional[Dict[str, Any]] = None,
max_await_time_ms: Optional[int] = None,
batch_size: Optional[int] = None,
collation=None,
start_at_operation_time=None,
session=None,
start_after: Optional[Dict[str, Any]] = None,
**kwargs
) -> AsyncIOMotorChangeStream:
"""Watch for changes on the collection."""
# Collection Management
async def drop(self, session=None) -> None:
"""Drop the collection."""
async def rename(self, new_name: str, session=None, **kwargs) -> None:
"""Rename the collection."""Client session for transaction support and causally consistent reads in AsyncIO applications.
class AsyncIOMotorClientSession:
"""
A session for ordering sequential operations and transactions.
Created via AsyncIOMotorClient.start_session(), not directly instantiated.
"""
# Properties
@property
def client(self) -> AsyncIOMotorClient:
"""The client this session was created from."""
@property
def cluster_time(self) -> Optional[Dict[str, Any]]:
"""The cluster time returned by the last operation."""
@property
def has_ended(self) -> bool:
"""Whether this session has ended."""
@property
def in_transaction(self) -> bool:
"""Whether this session is in an active transaction."""
@property
def operation_time(self) -> Optional[Any]:
"""The operation time returned by the last operation."""
@property
def options(self) -> Dict[str, Any]:
"""The options used to create this session."""
@property
def session_id(self) -> Dict[str, Any]:
"""A BSON document identifying this session."""
# Transaction Methods
def start_transaction(
self,
read_concern: Optional[Any] = None,
write_concern: Optional[Any] = None,
read_preference: Optional[Any] = None,
max_commit_time_ms: Optional[int] = None
) -> Any:
"""
Start a multi-statement transaction.
Returns a context manager for the transaction.
Use with async context manager syntax.
Parameters:
- read_concern: Read concern for the transaction
- write_concern: Write concern for the transaction
- read_preference: Read preference for the transaction
- max_commit_time_ms: Maximum time for commit operation
Returns:
Transaction context manager
"""
async def commit_transaction(self) -> None:
"""Commit the current transaction."""
async def abort_transaction(self) -> None:
"""Abort the current transaction."""
async def with_transaction(
self,
coro: Callable,
read_concern: Optional[Any] = None,
write_concern: Optional[Any] = None,
read_preference: Optional[Any] = None,
max_commit_time_ms: Optional[int] = None
) -> Any:
"""
Execute a coroutine within a transaction.
Automatically handles transaction retry logic for transient errors.
Will retry the entire transaction for up to 120 seconds.
Parameters:
- coro: Async function that takes this session as first argument
- read_concern: Read concern for the transaction
- write_concern: Write concern for the transaction
- read_preference: Read preference for the transaction
- max_commit_time_ms: Maximum time for commit operation
Returns:
Return value from the coroutine
"""
# Session Management
async def end_session(self) -> None:
"""End this session."""
def advance_cluster_time(self, cluster_time: Dict[str, Any]) -> None:
"""Advance the cluster time for this session."""
def advance_operation_time(self, operation_time: Any) -> None:
"""Advance the operation time for this session."""
# Context Manager Protocol
async def __aenter__(self) -> AsyncIOMotorClientSession:
"""Async context manager entry."""
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
"""Async context manager exit."""import asyncio
import motor.motor_asyncio
async def example():
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client.test_database
collection = db.test_collection
# Insert operations
result = await collection.insert_one({"name": "Alice", "age": 30})
print(f"Inserted ID: {result.inserted_id}")
results = await collection.insert_many([
{"name": "Bob", "age": 25},
{"name": "Charlie", "age": 35}
])
print(f"Inserted IDs: {results.inserted_ids}")
# Find operations
document = await collection.find_one({"name": "Alice"})
print(f"Found: {document}")
# Update operations
result = await collection.update_one(
{"name": "Alice"},
{"$set": {"age": 31}}
)
print(f"Modified {result.modified_count} document(s)")
# Delete operations
result = await collection.delete_one({"name": "Alice"})
print(f"Deleted {result.deleted_count} document(s)")
client.close()
asyncio.run(example())async def cursor_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.test_collection
# Async iteration
async for document in collection.find({"age": {"$gte": 18}}):
print(document)
# To list with limit
cursor = collection.find().limit(10)
documents = await cursor.to_list(length=10)
print(f"Found {len(documents)} documents")
client.close()import pymongo.errors
async def error_handling_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
collection = client.test_database.test_collection
try:
await collection.insert_one({"_id": 1, "name": "test"})
await collection.insert_one({"_id": 1, "name": "duplicate"}) # Will fail
except pymongo.errors.DuplicateKeyError as e:
print(f"Duplicate key error: {e}")
except pymongo.errors.ConnectionFailure as e:
print(f"Connection failed: {e}")
finally:
client.close()from typing import Any, Optional, Union, Dict, List, Tuple, Set
from datetime import datetime
AsyncIOMotorClientSession = Any # Actual session type from motor
AsyncIOMotorCommandCursor = Any # Command cursor type
AsyncIOMotorCursor = Any # Query cursor type
AsyncIOMotorChangeStream = Any # Change stream typeInstall with Tessl CLI
npx tessl i tessl/pypi-motor