AsyncIO MongoDB Object Document Mapper for Python using type hints
—
ODMantic provides both async and sync database engines for MongoDB operations. The AIOEngine is designed for async/await patterns with motor, while SyncEngine provides traditional synchronous operations with pymongo.
Async MongoDB operations engine using the motor driver for high-performance async applications.
class AIOEngine:
"""Asynchronous MongoDB engine using motor driver."""
def __init__(self, client=None, database="test"):
"""
Initialize async engine.
Args:
client: AsyncIOMotorClient instance, creates default if None
database: Database name to use (default: "test")
"""
def get_collection(self, model):
"""
Get motor collection for a model.
Args:
model: Model class
Returns:
AsyncIOMotorCollection: Motor collection object
"""
def session(self):
"""
Create new async session context manager.
Returns:
AIOSession: Session context manager
"""
def transaction(self):
"""
Create new async transaction context manager.
Returns:
AIOTransaction: Transaction context manager
"""
def find(self, model, *queries, sort=None, skip=0, limit=None, session=None):
"""
Find documents matching queries.
Args:
model: Model class to query
*queries: Query expressions to apply
sort: Sort expression or tuple of field proxies
skip: Number of documents to skip
limit: Maximum number of documents to return
session: Session for the operation
Returns:
AIOCursor: Async cursor for iteration
"""
async def find_one(self, model, *queries, sort=None, session=None):
"""
Find single document matching queries.
Args:
model: Model class to query
*queries: Query expressions to apply
sort: Sort expression or tuple of field proxies
session: Session for the operation
Returns:
Model instance or None if not found
"""
async def save(self, instance, *, session=None):
"""
Save model instance to database.
Args:
instance: Model instance to save
session: Session for the operation
Returns:
Model instance with updated fields
"""
async def save_all(self, instances, *, session=None):
"""
Save multiple model instances to database.
Args:
instances: List of model instances to save
session: Session for the operation
Returns:
List of model instances with updated fields
"""
async def delete(self, instance, *, session=None):
"""
Delete model instance from database.
Args:
instance: Model instance to delete
session: Session for the operation
"""
async def remove(self, model, *queries, just_one=False, session=None):
"""
Remove documents matching queries.
Args:
model: Model class to remove from
*queries: Query expressions to apply
just_one: Remove only the first matching document
session: Session for the operation
Returns:
int: Number of documents removed
"""
async def count(self, model, *queries, session=None):
"""
Count documents matching queries.
Args:
model: Model class to count
*queries: Query expressions to apply
session: Session for the operation
Returns:
int: Number of matching documents
"""
async def configure_database(self, models, session=None, update_existing_indexes=False):
"""
Configure database indexes for models.
Args:
models: List of model classes to configure
session: Session for the operation
update_existing_indexes: Update existing indexes if True
"""Synchronous MongoDB operations engine using the pymongo driver for traditional sync applications.
class SyncEngine:
"""Synchronous MongoDB engine using pymongo driver."""
def __init__(self, client=None, database="test"):
"""
Initialize sync engine.
Args:
client: MongoClient instance, creates default if None
database: Database name to use (default: "test")
"""
def get_collection(self, model):
"""
Get pymongo collection for a model.
Args:
model: Model class
Returns:
Collection: Pymongo collection object
"""
def session(self):
"""
Create new sync session context manager.
Returns:
SyncSession: Session context manager
"""
def transaction(self):
"""
Create new sync transaction context manager.
Returns:
SyncTransaction: Transaction context manager
"""
def find(self, model, *queries, sort=None, skip=0, limit=None, session=None):
"""
Find documents matching queries.
Args:
model: Model class to query
*queries: Query expressions to apply
sort: Sort expression or tuple of field proxies
skip: Number of documents to skip
limit: Maximum number of documents to return
session: Session for the operation
Returns:
SyncCursor: Sync cursor for iteration
"""
def find_one(self, model, *queries, sort=None, session=None):
"""
Find single document matching queries.
Args:
model: Model class to query
*queries: Query expressions to apply
sort: Sort expression or tuple of field proxies
session: Session for the operation
Returns:
Model instance or None if not found
"""
def save(self, instance, *, session=None):
"""
Save model instance to database.
Args:
instance: Model instance to save
session: Session for the operation
Returns:
Model instance with updated fields
"""
def save_all(self, instances, *, session=None):
"""
Save multiple model instances to database.
Args:
instances: List of model instances to save
session: Session for the operation
Returns:
List of model instances with updated fields
"""
def delete(self, instance, *, session=None):
"""
Delete model instance from database.
Args:
instance: Model instance to delete
session: Session for the operation
"""
def remove(self, model, *queries, just_one=False, session=None):
"""
Remove documents matching queries.
Args:
model: Model class to remove from
*queries: Query expressions to apply
just_one: Remove only the first matching document
session: Session for the operation
Returns:
int: Number of documents removed
"""
def count(self, model, *queries, session=None):
"""
Count documents matching queries.
Args:
model: Model class to count
*queries: Query expressions to apply
session: Session for the operation
Returns:
int: Number of matching documents
"""
def configure_database(self, models, session=None, update_existing_indexes=False):
"""
Configure database indexes for models.
Args:
models: List of model classes to configure
session: Session for the operation
update_existing_indexes: Update existing indexes if True
"""Cursors provide iteration over query results with different async/sync patterns.
class AIOCursor:
"""Async cursor supporting both async iteration and await."""
def __aiter__(self):
"""Async iterator protocol."""
def __await__(self):
"""Await to get list of all results."""
class SyncCursor:
"""Sync cursor supporting regular iteration."""
def __iter__(self):
"""Iterator protocol."""from odmantic import AIOEngine, Model
from motor.motor_asyncio import AsyncIOMotorClient
import asyncio
class User(Model):
name: str
email: str
async def example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
# Save a user
user = User(name="Alice", email="alice@example.com")
await engine.save(user)
# Find users
cursor = engine.find(User, User.name == "Alice")
async for user in cursor:
print(user.name)
# Or get all at once
users = await engine.find(User, User.name == "Alice")
print(f"Found {len(users)} users")
asyncio.run(example())from odmantic import SyncEngine, Model
from pymongo import MongoClient
class User(Model):
name: str
email: str
client = MongoClient("mongodb://localhost:27017")
engine = SyncEngine(client, database="mydb")
# Save a user
user = User(name="Bob", email="bob@example.com")
engine.save(user)
# Find users
cursor = engine.find(User, User.name == "Bob")
for user in cursor:
print(user.name)
# Count users
count = engine.count(User, User.name == "Bob")
print(f"Found {count} users")# Async session
async with engine.session() as session:
user = User(name="Charlie", email="charlie@example.com")
await engine.save(user, session=session)
users = await engine.find(User, session=session)
# Sync session
with engine.session() as session:
user = User(name="Dave", email="dave@example.com")
engine.save(user, session=session)
users = list(engine.find(User, session=session))Install with Tessl CLI
npx tessl i tessl/pypi-odmantic