AsyncIO MongoDB Object Document Mapper for Python using type hints
—
ODMantic provides session and transaction support for both async and sync operations, enabling atomic operations, consistency guarantees, and proper resource management.
Session context managers for async operations with proper resource cleanup.
class AIOSession:
"""Async session context manager for MongoDB operations."""
async def __aenter__(self):
"""Enter async context manager."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async context manager with cleanup."""
def get_driver_session(self):
"""
Get underlying motor client session.
Returns:
AsyncIOMotorClientSession: Motor session object
"""
class AIOTransaction(AIOSession):
"""Async transaction context manager for atomic operations."""
async def __aenter__(self):
"""Enter async transaction context."""
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""Exit async transaction with commit/rollback."""
def get_driver_session(self):
"""
Get underlying motor client session.
Returns:
AsyncIOMotorClientSession: Motor session object
"""Session context managers for sync operations with proper resource cleanup.
class SyncSession:
"""Sync session context manager for MongoDB operations."""
def __enter__(self):
"""Enter sync context manager."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit sync context manager with cleanup."""
def get_driver_session(self):
"""
Get underlying pymongo client session.
Returns:
ClientSession: Pymongo session object
"""
class SyncTransaction(SyncSession):
"""Sync transaction context manager for atomic operations."""
def __enter__(self):
"""Enter sync transaction context."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Exit sync transaction with commit/rollback."""
def get_driver_session(self):
"""
Get underlying pymongo client session.
Returns:
ClientSession: Pymongo session object
"""Methods on engines to create session and transaction contexts.
# AIOEngine methods
def session(self):
"""
Create async session context manager.
Returns:
AIOSession: Async session context manager
"""
def transaction(self):
"""
Create async transaction context manager.
Returns:
AIOTransaction: Async transaction context manager
"""
# SyncEngine methods
def session(self):
"""
Create sync session context manager.
Returns:
SyncSession: Sync session context manager
"""
def transaction(self):
"""
Create sync transaction context manager.
Returns:
SyncTransaction: Sync transaction context manager
"""from odmantic import AIOEngine, Model
from motor.motor_asyncio import AsyncIOMotorClient
class User(Model):
name: str
email: str
class Order(Model):
user_id: ObjectId
total: float
async def session_example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
# Basic session usage
async with engine.session() as session:
# All operations in this block use the same session
user = User(name="John", email="john@example.com")
await engine.save(user, session=session)
# Find operations also use the session
users = await engine.find(User, session=session)
# Session ensures consistent reads
user_count = await engine.count(User, session=session)
print(f"Found {user_count} users")from odmantic import SyncEngine, Model
from pymongo import MongoClient
def sync_session_example():
client = MongoClient("mongodb://localhost:27017")
engine = SyncEngine(client, database="mydb")
# Basic session usage
with engine.session() as session:
# All operations use the same session
user = User(name="Jane", email="jane@example.com")
engine.save(user, session=session)
# Consistent reads within session
users = list(engine.find(User, session=session))
user_count = engine.count(User, session=session)
print(f"Found {user_count} users")async def transaction_example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
try:
async with engine.transaction() as transaction:
# Create a user
user = User(name="Alice", email="alice@example.com")
await engine.save(user, session=transaction)
# Create an order for the user
order = Order(user_id=user.id, total=99.99)
await engine.save(order, session=transaction)
# Update user with additional info
user.model_update(total_orders=1)
await engine.save(user, session=transaction)
# If we reach here, transaction commits automatically
print("Transaction completed successfully")
except Exception as e:
# Transaction automatically rolls back on exception
print(f"Transaction failed: {e}")def sync_transaction_example():
client = MongoClient("mongodb://localhost:27017")
engine = SyncEngine(client, database="mydb")
try:
with engine.transaction() as transaction:
# Create a user
user = User(name="Bob", email="bob@example.com")
engine.save(user, session=transaction)
# Create an order for the user
order = Order(user_id=user.id, total=149.99)
engine.save(order, session=transaction)
# Transaction commits automatically if no exception
print("Transaction completed successfully")
except Exception as e:
# Transaction automatically rolls back on exception
print(f"Transaction failed: {e}")class Account(Model):
user_id: ObjectId
balance: float
class Transaction(Model):
from_account: ObjectId
to_account: ObjectId
amount: float
timestamp: datetime
async def money_transfer(engine: AIOEngine, from_id: ObjectId, to_id: ObjectId, amount: float):
"""Transfer money between accounts atomically."""
async with engine.transaction() as txn:
# Get source account
from_account = await engine.find_one(Account, Account.user_id == from_id, session=txn)
if not from_account or from_account.balance < amount:
raise ValueError("Insufficient funds")
# Get destination account
to_account = await engine.find_one(Account, Account.user_id == to_id, session=txn)
if not to_account:
raise ValueError("Destination account not found")
# Update balances
from_account.model_update(balance=from_account.balance - amount)
to_account.model_update(balance=to_account.balance + amount)
# Save updated accounts
await engine.save(from_account, session=txn)
await engine.save(to_account, session=txn)
# Record transaction
transaction = Transaction(
from_account=from_id,
to_account=to_id,
amount=amount,
timestamp=datetime.utcnow()
)
await engine.save(transaction, session=txn)
print(f"Transferred ${amount} from {from_id} to {to_id}")async def robust_session_example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
session = None
try:
async with engine.session() as session:
# Perform multiple operations
users = []
for i in range(10):
user = User(name=f"User {i}", email=f"user{i}@example.com")
await engine.save(user, session=session)
users.append(user)
# Verify all users were created
total_users = await engine.count(User, session=session)
print(f"Created {len(users)} users, total in DB: {total_users}")
except Exception as e:
print(f"Session failed: {e}")
# Session cleanup happens automaticallyasync def nested_operations(engine: AIOEngine, session=None):
"""Function that can work with or without a session."""
# Use provided session or create a new one
if session:
# Use existing session
user = User(name="Nested User", email="nested@example.com")
await engine.save(user, session=session)
return user
else:
# Create new session
async with engine.session() as new_session:
user = User(name="New Session User", email="newsession@example.com")
await engine.save(user, session=new_session)
return user
async def caller_example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
# Call without session - function creates its own
user1 = await nested_operations(engine)
# Call with session - function uses provided session
async with engine.session() as session:
user2 = await nested_operations(engine, session=session)
user3 = await nested_operations(engine, session=session)
# All operations in this session are consistent
count = await engine.count(User, session=session)
print(f"Users in session: {count}")async def transaction_rollback_example():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
# Example 1: Explicit rollback
try:
async with engine.transaction() as txn:
user = User(name="Test User", email="test@example.com")
await engine.save(user, session=txn)
# Some condition that requires rollback
if user.name == "Test User":
raise ValueError("Test users not allowed")
except ValueError as e:
print(f"Transaction rolled back: {e}")
# Example 2: Automatic rollback on any exception
try:
async with engine.transaction() as txn:
# This will all be rolled back
for i in range(5):
user = User(name=f"User {i}", email=f"user{i}@example.com")
await engine.save(user, session=txn)
# Simulate an error
raise RuntimeError("Something went wrong")
except RuntimeError as e:
print(f"All operations rolled back: {e}")
# Verify no users were created
count = await engine.count(User)
print(f"Total users after rollback: {count}")async def session_configuration_examples():
client = AsyncIOMotorClient("mongodb://localhost:27017")
engine = AIOEngine(client, database="mydb")
# Sessions automatically use appropriate read/write concerns
# based on MongoDB configuration and replica set setup
async with engine.session() as session:
# Session inherits client's read preference, write concern, etc.
users = await engine.find(User, session=session)
async with engine.transaction() as txn:
# Transactions automatically use appropriate concerns for ACID compliance
user = User(name="Transactional User", email="txn@example.com")
await engine.save(user, session=txn)Install with Tessl CLI
npx tessl i tessl/pypi-odmantic