Asynchronous Python ODM for MongoDB with modern Pydantic-based document mapping
npx @tessl/cli install tessl/pypi-beanie@2.0.0An asynchronous Python object-document mapper (ODM) for MongoDB that provides a modern, Pydantic-based approach to database interactions. Built on top of PyMongo's async client and Pydantic models, Beanie offers a clean, type-safe API for document operations including CRUD operations, aggregation pipelines, and complex queries with pythonic syntax.
pip install beanieimport beanieCommon patterns for working with documents:
from beanie import Document, init_beanieImport specific components:
from beanie import (
Document, DocumentWithSoftDelete, View, UnionDoc,
init_beanie, PydanticObjectId, Indexed, Link, BackLink,
before_event, after_event, Insert, Save, Update, Delete,
BulkWriter, iterative_migration, free_fall_migration
)import asyncio
from beanie import Document, init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from pydantic import BaseModel
# Define a document model
class User(Document):
name: str
email: str
age: int = 0
class Settings:
collection = "users"
# Initialize Beanie
async def init_db():
client = AsyncIOMotorClient("mongodb://localhost:27017")
database = client.myapp
await init_beanie(database=database, document_models=[User])
# Basic CRUD operations
async def main():
await init_db()
# Create a new user
user = User(name="John Doe", email="john@example.com", age=30)
await user.insert()
# Find users
users = await User.find_all().to_list()
user = await User.find_one(User.email == "john@example.com")
# Update user
await user.update({"$set": {"age": 31}})
# Delete user
await user.delete()
if __name__ == "__main__":
asyncio.run(main())Beanie follows a layered architecture built on MongoDB and Pydantic:
This design provides full MongoDB feature access while maintaining Python type safety and async performance, making it ideal for modern FastAPI applications and microservices.
Core document mapping classes for different MongoDB collection patterns, providing CRUD operations, query interfaces, and lifecycle management.
class Document(BaseModel):
id: Optional[PydanticObjectId] = None
revision_id: Optional[UUID] = None
# Core CRUD operations
async def insert(self, session: Optional[AsyncClientSession] = None, **kwargs) -> "Document": ...
async def save(self, session: Optional[AsyncClientSession] = None, **kwargs) -> "Document": ...
async def save_changes(self, session: Optional[AsyncClientSession] = None, **kwargs) -> Optional["Document"]: ...
async def update(self, *args, session: Optional[AsyncClientSession] = None, **kwargs) -> None: ...
async def replace(self, session: Optional[AsyncClientSession] = None, **kwargs) -> "Document": ...
async def delete(self, session: Optional[AsyncClientSession] = None, **kwargs) -> None: ...
# Class methods
@classmethod
async def get(cls, document_id: Any, **kwargs) -> Optional["Document"]: ...
@classmethod
async def find_one(cls, filter_query: Optional[Dict] = None, **kwargs) -> Optional["Document"]: ...
@classmethod
def find(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface": ...
@classmethod
def aggregate(cls, pipeline: List[Dict], **kwargs) -> "AggregateInterface": ...
@classmethod
async def insert_many(cls, documents: Iterable["Document"], **kwargs) -> InsertManyResult: ...
@classmethod
async def delete_all(cls, **kwargs) -> DeleteResult: ...
@classmethod
async def distinct(cls, key: str, **kwargs) -> List[Any]: ...
# State management
@property
def is_changed(self) -> bool: ...
def get_changes(self) -> Dict[str, Any]: ...
def rollback(self) -> None: ...
class DocumentWithSoftDelete(Document):
deleted_at: Optional[datetime] = None
async def delete(self) -> None: ...
async def hard_delete(self) -> None: ...
def is_deleted(self) -> bool: ...
class View(BaseModel):
@classmethod
def find(cls, *args, **kwargs) -> "FindInterface": ...
@classmethod
def aggregate(cls, pipeline: List[Dict]) -> "AggregateInterface": ...
class UnionDoc:
@classmethod
async def register_doc(cls, doc_model: Type[Document]) -> None: ...
@classmethod
def find(cls, *args, **kwargs) -> "FindInterface": ...Type-safe field definitions with MongoDB-specific types, indexing capabilities, and document relationships with lazy loading support.
class PydanticObjectId(ObjectId):
def __init__(self, value: Union[str, ObjectId, None] = None): ...
def __str__(self) -> str: ...
BeanieObjectId = PydanticObjectId
def Indexed(annotation, index_type=None, unique=False, sparse=False, **kwargs):
"""Create indexed field annotation for automatic index creation."""
...
class Link(Generic[T]):
def __init__(self, ref: DBRef, document_class: Type[T]): ...
async def fetch(self, fetch_links: bool = False) -> Union[T, "Link[T]"]: ...
@classmethod
async def fetch_list(cls, links: List[Union["Link[T]", T]], fetch_links: bool = False) -> List[Union[T, "Link[T]"]]: ...
def to_dict(self) -> Dict[str, str]: ...
class BackLink(Generic[T]):
def __init__(self, document_class: Type[T], original_field: str): ...Pre/post event hooks for document lifecycle events with action type constants for flexible business logic integration.
# Event decorators
def before_event(*actions):
"""Decorator to register functions that run before document events."""
...
def after_event(*actions):
"""Decorator to register functions that run after document events."""
...
# Action type constants
class Insert: ...
class Replace: ...
class Save: ...
class SaveChanges: ...
class ValidateOnSave: ...
class Delete: ...
class Update: ...
# Action direction constants
class Before: ...
class After: ...Efficient bulk write operations with transaction support for high-performance batch processing.
class BulkWriter:
def __init__(self, *args, **kwargs): ...
async def __aenter__(self) -> "BulkWriter": ...
async def __aexit__(self, exc_type, exc_val, exc_tb): ...
def add_operation(self, operation: Dict) -> None: ...
async def commit(self) -> Any: ...Schema migration tools for iterative and free-form data migrations with batch processing support.
def iterative_migration(document_models: List[Type[Document]], batch_size: int = 1000):
"""Decorator for creating iterative data migrations with batching."""
...
def free_fall_migration(document_models: List[Type[Document]]):
"""Decorator for creating free-form migration functions."""
...Query response configuration and utility enumerations for sorting and document merging strategies.
class UpdateResponse(Enum):
UPDATE_RESULT = "update_result"
OLD_DOCUMENT = "old_document"
NEW_DOCUMENT = "new_document"
class SortDirection(Enum):
ASCENDING = 1
DESCENDING = -1
class MergeStrategy(Enum):
local = "local"
remote = "remote"MongoDB time series collection configuration with granularity control and TTL support.
class TimeSeriesConfig(BaseModel):
time_field: str
meta_field: Optional[str] = None
granularity: Optional[Granularity] = None
expire_after_seconds: Optional[int] = None
class Granularity(Enum):
seconds = "seconds"
minutes = "minutes"
hours = "hours"Pydantic-compatible custom field types for MongoDB-specific data types like Decimal128 and Binary.
DecimalAnnotation: TypeAlias
"""Pydantic annotation for handling BSON Decimal128 types."""
BsonBinary: TypeAlias
"""Pydantic-compatible BSON binary field type."""Database and ODM initialization with connection management and model registration.
async def init_beanie(
database,
document_models: List[Union[Type[Document], Type[View], str]],
connection_string: Optional[str] = None,
allow_index_dropping: bool = False,
recreate_views: bool = False,
skip_indexes: bool = False
) -> None:
"""Initialize Beanie ODM with database connection and document models."""
...# Common type aliases
DocumentType = TypeVar("DocumentType", bound=Document)
FindInterface = TypeVar("FindInterface")
AggregateInterface = TypeVar("AggregateInterface")
# Rule enumerations
class WriteRules(Enum):
DO_NOTHING = "DO_NOTHING"
WRITE = "WRITE"
class DeleteRules(Enum):
DO_NOTHING = "DO_NOTHING"
DELETE_LINKS = "DELETE_LINKS"