Asynchronous Python ODM for MongoDB with modern Pydantic-based document mapping
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Core document mapping classes that provide the foundation for MongoDB collection interactions in Beanie. These classes extend Pydantic models with MongoDB-specific functionality, offering CRUD operations, query interfaces, and lifecycle management.
The primary document mapping class that represents a MongoDB document with full CRUD operations and query capabilities.
class Document(BaseModel):
id: Optional[PydanticObjectId] = None
revision_id: Optional[UUID] = None
# Instance methods for CRUD operations
async def insert(
self,
*,
session: Optional[AsyncClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
**kwargs
) -> "Document":
"""Insert this document into the collection."""
...
async def create(
self,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> "Document":
"""Alias for insert() method."""
...
async def save(
self,
session: Optional[AsyncClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
**kwargs
) -> "Document":
"""Insert or update this document."""
...
async def save_changes(
self,
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
) -> Optional["Document"]:
"""Save only the changes made to this document."""
...
async def update(
self,
*args: Union[Dict[Any, Any], Mapping[Any, Any]],
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
skip_sync: bool = False,
**kwargs
) -> None:
"""Update this document with the given update query."""
...
async def replace(
self,
ignore_revision: bool = False,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
**kwargs
) -> "Document":
"""Replace this document entirely in the collection."""
...
async def delete(
self,
session: Optional[AsyncClientSession] = None,
bulk_writer: Optional[BulkWriter] = None,
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
**kwargs
) -> None:
"""Delete this document from the collection."""
...
async def sync(self, merge_strategy: MergeStrategy = MergeStrategy.remote) -> None:
"""Sync document with database using specified merge strategy."""
...
# Class methods for single document operations
@classmethod
async def get(
cls,
document_id: Any,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**kwargs
) -> Optional["Document"]:
"""Get a document by its ID."""
...
@classmethod
async def insert_one(
cls,
document: "Document",
session: Optional[AsyncClientSession] = None,
**kwargs
) -> "Document":
"""Insert a single document."""
...
@classmethod
async def insert_many(
cls,
documents: Iterable["Document"],
session: Optional[AsyncClientSession] = None,
link_rule: WriteRules = WriteRules.DO_NOTHING,
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
**kwargs
) -> InsertManyResult:
"""Insert multiple documents."""
...
@classmethod
async def replace_many(
cls,
documents: List["Document"],
session: Optional[AsyncClientSession] = None,
**kwargs
) -> None:
"""Replace multiple documents."""
...
# Class methods for querying
@classmethod
async def find_one(
cls,
filter_query: Optional[Union[Dict, bool]] = None,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**kwargs
) -> Optional["Document"]:
"""Find a single document matching the filter."""
...
@classmethod
def find(
cls,
filter_query: Optional[Union[Dict, bool]] = None,
skip: Optional[int] = None,
limit: Optional[int] = None,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
nesting_depth: Optional[int] = None,
nesting_depths_per_field: Optional[Dict[str, int]] = None,
**kwargs
) -> "FindInterface":
"""Find documents matching the filter."""
...
@classmethod
def find_all(
cls,
skip: Optional[int] = None,
limit: Optional[int] = None,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
fetch_links: bool = False,
with_children: bool = False,
lazy_parse: bool = False,
**kwargs
) -> "FindInterface":
"""Find all documents in the collection."""
...
@classmethod
def aggregate(
cls,
pipeline: List[Dict],
projection_model: Optional[Type[BaseModel]] = None,
session: Optional[AsyncClientSession] = None,
ignore_cache: bool = False,
**kwargs
) -> "AggregateInterface":
"""Run an aggregation pipeline."""
...
@classmethod
async def count_documents(
cls,
filter_query: Optional[Dict] = None,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> int:
"""Count documents matching the filter."""
...
@classmethod
async def distinct(
cls,
key: str,
filter_query: Optional[Dict] = None,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> List[Any]:
"""Get distinct values for a field."""
...
# Class methods for bulk operations
@classmethod
def update_all(
cls,
update_query: Dict[str, Any],
session: Optional[AsyncClientSession] = None,
**kwargs
) -> UpdateMany:
"""Update all documents matching the filter."""
...
@classmethod
async def delete_all(
cls,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> DeleteResult:
"""Delete all documents in the collection."""
...
@classmethod
def bulk_writer(
cls,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> BulkWriter:
"""Get a bulk writer for this document class."""
...
# State management methods
@classmethod
def use_state_management(cls) -> bool:
"""Check if state management is enabled for this document class."""
...
@classmethod
def state_management_save_previous(cls) -> bool:
"""Check if previous state saving is enabled."""
...
@classmethod
def state_management_replace_objects(cls) -> bool:
"""Check if object replacement is enabled in state management."""
...
def get_saved_state(self) -> Optional[Dict[str, Any]]:
"""Get the saved state of this document."""
...
def get_previous_saved_state(self) -> Optional[Dict[str, Any]]:
"""Get the previous saved state of this document."""
...
@property
def is_changed(self) -> bool:
"""Check if document has any unsaved changes."""
...
@property
def has_changed(self) -> bool:
"""Check if document has changed since last save."""
...
def get_changes(self) -> Dict[str, Any]:
"""Get a dictionary of all changes made to this document."""
...
def get_previous_changes(self) -> Dict[str, Any]:
"""Get the previous changes made to this document."""
...
def rollback(self) -> None:
"""Rollback document to its saved state."""
...
# Link management methods
async def fetch_link(
self,
item: Union[Link, BackLink, str],
session: Optional[AsyncClientSession] = None,
**kwargs
) -> None:
"""Fetch a specific link field."""
...
async def fetch_all_links(
self,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> None:
"""Fetch all link fields in this document."""
...
def to_ref(self) -> DBRef:
"""Convert this document to a MongoDB DBRef."""
...
@classmethod
def link_from_id(cls, object_id: PydanticObjectId) -> Link:
"""Create a Link object from an ObjectId."""
...
# Collection management
@classmethod
async def inspect_collection(
cls,
session: Optional[AsyncClientSession] = None
) -> InspectionResult:
"""Inspect the collection for validation and integrity."""
...
@classmethod
def get_settings(cls) -> DocumentSettings:
"""Get the settings configuration for this document class."""
...
# Update operators (convenience methods)
def set(self, expression: Dict[Union[ExpressionField, str], Any]) -> SetOperator:
"""Create a Set update operator."""
...
def current_date(
self,
expression: Dict[Union[ExpressionField, str], Optional[str]]
) -> CurrentDate:
"""Create a CurrentDate update operator."""
...
def inc(self, expression: Dict[Union[ExpressionField, str], Union[int, float]]) -> Inc:
"""Create an Inc update operator."""
...
# Settings configuration
class Settings:
collection: Optional[str] = None
name: Optional[str] = None
indexes: Optional[List] = None
use_state_management: bool = False
validate_on_save: bool = False
use_revision_id: bool = False
use_enum_values: bool = False
bson_encoders: Optional[Dict] = None
lazy_parsing: bool = False
is_root: bool = False
union_doc: Optional[Type] = None
inheritance: bool = False
timeseries: Optional[TimeSeriesConfig] = None# Define a document model
class User(Document):
name: str
email: str
age: int = 0
active: bool = True
class Settings:
collection = "users"
use_state_management = True
# Create and insert
user = User(name="Alice", email="alice@example.com", age=25)
await user.insert()
# Find operations
user = await User.find_one(User.email == "alice@example.com")
users = await User.find(User.age > 18).to_list()
active_users = await User.find(User.active == True).to_list()
# Update operations
await user.update({"$set": {"age": 26}})
await user.update({"$inc": {"age": 1}})
# Replace and delete
user.age = 30
await user.replace()
await user.delete()Document class that implements soft delete functionality, marking documents as deleted rather than removing them permanently.
class DocumentWithSoftDelete(Document):
deleted_at: Optional[datetime] = None
async def delete(self, **kwargs) -> None:
"""Soft delete by setting deleted_at timestamp."""
...
async def hard_delete(self, **kwargs) -> None:
"""Permanently delete the document from the collection."""
...
def is_deleted(self) -> bool:
"""Check if document is soft deleted."""
...
@classmethod
def find_many_in_all(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
"""Find documents including soft deleted ones."""
...
@classmethod
def get_deleted(cls, **kwargs) -> "FindInterface":
"""Get only soft deleted documents."""
...class SoftUser(DocumentWithSoftDelete):
name: str
email: str
class Settings:
collection = "soft_users"
user = SoftUser(name="Bob", email="bob@example.com")
await user.insert()
# Soft delete - sets deleted_at timestamp
await user.delete()
print(user.is_deleted()) # True
# Find excluding deleted
active_users = await SoftUser.find_all().to_list()
# Find including deleted
all_users = await SoftUser.find_many_in_all().to_list()
# Permanently delete
await user.hard_delete()Read-only mapping class for MongoDB views that provides query functionality without modification operations.
class View(BaseModel):
id: Optional[PydanticObjectId] = None
@classmethod
async def find_one(cls, filter_query: Optional[Dict] = None, **kwargs) -> Optional["View"]:
"""Find a single document in the view."""
...
@classmethod
def find(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
"""Find documents in the view."""
...
@classmethod
def find_all(cls, **kwargs) -> "FindInterface":
"""Find all documents in the view."""
...
@classmethod
def aggregate(cls, pipeline: List[Dict], **kwargs) -> "AggregateInterface":
"""Run an aggregation pipeline on the view."""
...
@classmethod
async def count_documents(cls, filter_query: Optional[Dict] = None) -> int:
"""Count documents in the view."""
...
# Settings configuration
class Settings:
source: Type[Document]
pipeline: List[Dict]
name: Optional[str] = None# Define a view based on User documents
class ActiveUserView(View):
name: str
email: str
age: int
class Settings:
source = User
pipeline = [
{"$match": {"active": True}},
{"$project": {"name": 1, "email": 1, "age": 1}}
]
# Query the view (read-only)
active_users = await ActiveUserView.find_all().to_list()
young_users = await ActiveUserView.find(ActiveUserView.age < 30).to_list()Base class for handling multiple document types within a single collection, enabling polymorphic document storage.
class UnionDoc:
@classmethod
def register_doc(cls, name: str, doc_model: Type[Document]) -> None:
"""Register a document model with this union using a specific name."""
...
@classmethod
def find(
cls,
filter_query: Optional[Dict] = None,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> "FindInterface":
"""Find documents across all registered types."""
...
@classmethod
def find_all(
cls,
session: Optional[AsyncClientSession] = None,
**kwargs
) -> "FindInterface":
"""Find all documents across all registered types."""
...
@classmethod
def aggregate(
cls,
pipeline: List[Dict],
session: Optional[AsyncClientSession] = None,
**kwargs
) -> "AggregateInterface":
"""Run aggregation across all registered types."""
...
@classmethod
def bulk_writer(
cls,
session: Optional[AsyncClientSession] = None,
ordered: bool = True,
bypass_document_validation: bool = False,
comment: Optional[str] = None,
) -> BulkWriter:
"""Get a bulk writer for union operations."""
...
# Settings configuration
class Settings:
name: Optional[str] = None
class_id: str = "_class_id"# Define different document types for the same collection
class Animal(Document):
name: str
species: str
class Settings:
collection = "animals"
is_root = True
class Dog(Animal):
breed: str
good_boy: bool = True
class Cat(Animal):
lives_remaining: int = 9
attitude: str = "aloof"
# Create union for polymorphic operations
class AnimalUnion(UnionDoc):
class Settings:
name = "animals"
# Register document types with names
AnimalUnion.register_doc("Dog", Dog)
AnimalUnion.register_doc("Cat", Cat)
# Query across all types
all_animals = await AnimalUnion.find_all().to_list()
pets_named_buddy = await AnimalUnion.find(Animal.name == "Buddy").to_list()from datetime import datetime
from typing import Optional, Dict, List, Type, Union, Any
from uuid import UUID
from bson import ObjectId
# Document lifecycle merge strategies
class MergeStrategy(Enum):
local = "local" # Prefer local changes
remote = "remote" # Prefer database state
# Base document type
DocumentType = TypeVar("DocumentType", bound=Document)
# Query interface types
FindInterface = TypeVar("FindInterface")
AggregateInterface = TypeVar("AggregateInterface")Install with Tessl CLI
npx tessl i tessl/pypi-beanie