CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-odmantic

AsyncIO MongoDB Object Document Mapper for Python using type hints

Pending
Overview
Eval results
Files

sessions.mddocs/

Sessions

ODMantic provides session and transaction support for both async and sync operations, enabling atomic operations, consistency guarantees, and proper resource management.

Capabilities

Async Session Management

Session context managers for async operations with proper resource cleanup.

class AIOSession:
    """Async session context manager for MongoDB operations."""
    
    async def __aenter__(self):
        """Enter async context manager."""
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async context manager with cleanup."""
    
    def get_driver_session(self):
        """
        Get underlying motor client session.
        
        Returns:
            AsyncIOMotorClientSession: Motor session object
        """

class AIOTransaction(AIOSession):
    """Async transaction context manager for atomic operations."""
    
    async def __aenter__(self):
        """Enter async transaction context."""
        
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """Exit async transaction with commit/rollback."""
    
    def get_driver_session(self):
        """
        Get underlying motor client session.
        
        Returns:
            AsyncIOMotorClientSession: Motor session object
        """

Sync Session Management

Session context managers for sync operations with proper resource cleanup.

class SyncSession:
    """Sync session context manager for MongoDB operations."""
    
    def __enter__(self):
        """Enter sync context manager."""
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit sync context manager with cleanup."""
    
    def get_driver_session(self):
        """
        Get underlying pymongo client session.
        
        Returns:
            ClientSession: Pymongo session object
        """

class SyncTransaction(SyncSession):
    """Sync transaction context manager for atomic operations."""
    
    def __enter__(self):
        """Enter sync transaction context."""
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        """Exit sync transaction with commit/rollback."""
    
    def get_driver_session(self):
        """
        Get underlying pymongo client session.
        
        Returns:
            ClientSession: Pymongo session object
        """

Engine Session Methods

Methods on engines to create session and transaction contexts.

# AIOEngine methods
def session(self):
    """
    Create async session context manager.
    
    Returns:
        AIOSession: Async session context manager
    """

def transaction(self):
    """
    Create async transaction context manager.
    
    Returns:
        AIOTransaction: Async transaction context manager
    """

# SyncEngine methods  
def session(self):
    """
    Create sync session context manager.
    
    Returns:
        SyncSession: Sync session context manager
    """

def transaction(self):
    """
    Create sync transaction context manager.
    
    Returns:
        SyncTransaction: Sync transaction context manager
    """

Usage Examples

Basic Session Usage (Async)

from odmantic import AIOEngine, Model
from motor.motor_asyncio import AsyncIOMotorClient

class User(Model):
    name: str
    email: str

class Order(Model):
    user_id: ObjectId
    total: float

async def session_example():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    # Basic session usage
    async with engine.session() as session:
        # All operations in this block use the same session
        user = User(name="John", email="john@example.com")
        await engine.save(user, session=session)
        
        # Find operations also use the session
        users = await engine.find(User, session=session)
        
        # Session ensures consistent reads
        user_count = await engine.count(User, session=session)
        print(f"Found {user_count} users")

Basic Session Usage (Sync)

from odmantic import SyncEngine, Model
from pymongo import MongoClient

def sync_session_example():
    client = MongoClient("mongodb://localhost:27017")
    engine = SyncEngine(client, database="mydb")
    
    # Basic session usage
    with engine.session() as session:
        # All operations use the same session
        user = User(name="Jane", email="jane@example.com")
        engine.save(user, session=session)
        
        # Consistent reads within session
        users = list(engine.find(User, session=session))
        user_count = engine.count(User, session=session)
        print(f"Found {user_count} users")

Transaction Usage (Async)

async def transaction_example():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    try:
        async with engine.transaction() as transaction:
            # Create a user
            user = User(name="Alice", email="alice@example.com")
            await engine.save(user, session=transaction)
            
            # Create an order for the user
            order = Order(user_id=user.id, total=99.99)
            await engine.save(order, session=transaction)
            
            # Update user with additional info
            user.model_update(total_orders=1)
            await engine.save(user, session=transaction)
            
            # If we reach here, transaction commits automatically
            print("Transaction completed successfully")
            
    except Exception as e:
        # Transaction automatically rolls back on exception
        print(f"Transaction failed: {e}")

Transaction Usage (Sync)

def sync_transaction_example():
    client = MongoClient("mongodb://localhost:27017")
    engine = SyncEngine(client, database="mydb")
    
    try:
        with engine.transaction() as transaction:
            # Create a user
            user = User(name="Bob", email="bob@example.com")
            engine.save(user, session=transaction)
            
            # Create an order for the user
            order = Order(user_id=user.id, total=149.99)
            engine.save(order, session=transaction)
            
            # Transaction commits automatically if no exception
            print("Transaction completed successfully")
            
    except Exception as e:
        # Transaction automatically rolls back on exception
        print(f"Transaction failed: {e}")

Multi-Collection Transactions

class Account(Model):
    user_id: ObjectId
    balance: float

class Transaction(Model):
    from_account: ObjectId
    to_account: ObjectId
    amount: float
    timestamp: datetime

async def money_transfer(engine: AIOEngine, from_id: ObjectId, to_id: ObjectId, amount: float):
    """Transfer money between accounts atomically."""
    
    async with engine.transaction() as txn:
        # Get source account
        from_account = await engine.find_one(Account, Account.user_id == from_id, session=txn)
        if not from_account or from_account.balance < amount:
            raise ValueError("Insufficient funds")
        
        # Get destination account
        to_account = await engine.find_one(Account, Account.user_id == to_id, session=txn)
        if not to_account:
            raise ValueError("Destination account not found")
        
        # Update balances
        from_account.model_update(balance=from_account.balance - amount)
        to_account.model_update(balance=to_account.balance + amount)
        
        # Save updated accounts
        await engine.save(from_account, session=txn)
        await engine.save(to_account, session=txn)
        
        # Record transaction
        transaction = Transaction(
            from_account=from_id,
            to_account=to_id,
            amount=amount,
            timestamp=datetime.utcnow()
        )
        await engine.save(transaction, session=txn)
        
        print(f"Transferred ${amount} from {from_id} to {to_id}")

Session with Error Handling

async def robust_session_example():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    session = None
    try:
        async with engine.session() as session:
            # Perform multiple operations
            users = []
            for i in range(10):
                user = User(name=f"User {i}", email=f"user{i}@example.com")
                await engine.save(user, session=session)
                users.append(user)
            
            # Verify all users were created
            total_users = await engine.count(User, session=session)
            print(f"Created {len(users)} users, total in DB: {total_users}")
            
    except Exception as e:
        print(f"Session failed: {e}")
        # Session cleanup happens automatically

Nested Session Handling

async def nested_operations(engine: AIOEngine, session=None):
    """Function that can work with or without a session."""
    
    # Use provided session or create a new one
    if session:
        # Use existing session
        user = User(name="Nested User", email="nested@example.com")
        await engine.save(user, session=session)
        return user
    else:
        # Create new session
        async with engine.session() as new_session:
            user = User(name="New Session User", email="newsession@example.com")
            await engine.save(user, session=new_session)
            return user

async def caller_example():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    # Call without session - function creates its own
    user1 = await nested_operations(engine)
    
    # Call with session - function uses provided session
    async with engine.session() as session:
        user2 = await nested_operations(engine, session=session)
        user3 = await nested_operations(engine, session=session)
        
        # All operations in this session are consistent
        count = await engine.count(User, session=session)
        print(f"Users in session: {count}")

Transaction Rollback Examples

async def transaction_rollback_example():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    # Example 1: Explicit rollback
    try:
        async with engine.transaction() as txn:
            user = User(name="Test User", email="test@example.com")
            await engine.save(user, session=txn)
            
            # Some condition that requires rollback
            if user.name == "Test User":
                raise ValueError("Test users not allowed")
                
    except ValueError as e:
        print(f"Transaction rolled back: {e}")
    
    # Example 2: Automatic rollback on any exception
    try:
        async with engine.transaction() as txn:
            # This will all be rolled back
            for i in range(5):
                user = User(name=f"User {i}", email=f"user{i}@example.com")
                await engine.save(user, session=txn)
            
            # Simulate an error
            raise RuntimeError("Something went wrong")
            
    except RuntimeError as e:
        print(f"All operations rolled back: {e}")
    
    # Verify no users were created
    count = await engine.count(User)
    print(f"Total users after rollback: {count}")

Session Configuration

async def session_configuration_examples():
    client = AsyncIOMotorClient("mongodb://localhost:27017")
    engine = AIOEngine(client, database="mydb")
    
    # Sessions automatically use appropriate read/write concerns
    # based on MongoDB configuration and replica set setup
    
    async with engine.session() as session:
        # Session inherits client's read preference, write concern, etc.
        users = await engine.find(User, session=session)
        
    async with engine.transaction() as txn:
        # Transactions automatically use appropriate concerns for ACID compliance
        user = User(name="Transactional User", email="txn@example.com")
        await engine.save(user, session=txn)

Install with Tessl CLI

npx tessl i tessl/pypi-odmantic

docs

bson-types.md

engines.md

fields.md

index.md

indexes.md

models.md

queries.md

sessions.md

tile.json