CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-beanie

Asynchronous Python ODM for MongoDB with modern Pydantic-based document mapping

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

documents.mddocs/

Document Models

Core document mapping classes that provide the foundation for MongoDB collection interactions in Beanie. These classes extend Pydantic models with MongoDB-specific functionality, offering CRUD operations, query interfaces, and lifecycle management.

Capabilities

Document

The primary document mapping class that represents a MongoDB document with full CRUD operations and query capabilities.

class Document(BaseModel):
    id: Optional[PydanticObjectId] = None
    revision_id: Optional[UUID] = None
    
    # Instance methods for CRUD operations
    async def insert(
        self,
        *,
        session: Optional[AsyncClientSession] = None,
        link_rule: WriteRules = WriteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        **kwargs
    ) -> "Document":
        """Insert this document into the collection."""
        ...
    
    async def create(
        self,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> "Document":
        """Alias for insert() method."""
        ...
    
    async def save(
        self,
        session: Optional[AsyncClientSession] = None,
        link_rule: WriteRules = WriteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        **kwargs
    ) -> "Document":
        """Insert or update this document."""
        ...
    
    async def save_changes(
        self,
        ignore_revision: bool = False,
        session: Optional[AsyncClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        link_rule: WriteRules = WriteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
    ) -> Optional["Document"]:
        """Save only the changes made to this document."""
        ...
    
    async def update(
        self,
        *args: Union[Dict[Any, Any], Mapping[Any, Any]],
        ignore_revision: bool = False,
        session: Optional[AsyncClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        skip_sync: bool = False,
        **kwargs
    ) -> None:
        """Update this document with the given update query."""
        ...
    
    async def replace(
        self,
        ignore_revision: bool = False,
        session: Optional[AsyncClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        link_rule: WriteRules = WriteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        **kwargs
    ) -> "Document":
        """Replace this document entirely in the collection."""
        ...
    
    async def delete(
        self,
        session: Optional[AsyncClientSession] = None,
        bulk_writer: Optional[BulkWriter] = None,
        link_rule: DeleteRules = DeleteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        **kwargs
    ) -> None:
        """Delete this document from the collection."""
        ...
    
    async def sync(self, merge_strategy: MergeStrategy = MergeStrategy.remote) -> None:
        """Sync document with database using specified merge strategy."""
        ...
    
    # Class methods for single document operations
    @classmethod
    async def get(
        cls,
        document_id: Any,
        session: Optional[AsyncClientSession] = None,
        ignore_cache: bool = False,
        fetch_links: bool = False,
        with_children: bool = False,
        nesting_depth: Optional[int] = None,
        nesting_depths_per_field: Optional[Dict[str, int]] = None,
        **kwargs
    ) -> Optional["Document"]:
        """Get a document by its ID."""
        ...
    
    @classmethod
    async def insert_one(
        cls,
        document: "Document",
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> "Document":
        """Insert a single document."""
        ...
    
    @classmethod
    async def insert_many(
        cls,
        documents: Iterable["Document"],
        session: Optional[AsyncClientSession] = None,
        link_rule: WriteRules = WriteRules.DO_NOTHING,
        skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
        **kwargs
    ) -> InsertManyResult:
        """Insert multiple documents."""
        ...
    
    @classmethod
    async def replace_many(
        cls,
        documents: List["Document"],
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> None:
        """Replace multiple documents."""
        ...
    
    # Class methods for querying
    @classmethod
    async def find_one(
        cls,
        filter_query: Optional[Union[Dict, bool]] = None,
        session: Optional[AsyncClientSession] = None,
        ignore_cache: bool = False,
        fetch_links: bool = False,
        with_children: bool = False,
        nesting_depth: Optional[int] = None,
        nesting_depths_per_field: Optional[Dict[str, int]] = None,
        **kwargs
    ) -> Optional["Document"]:
        """Find a single document matching the filter."""
        ...
    
    @classmethod
    def find(
        cls,
        filter_query: Optional[Union[Dict, bool]] = None,
        skip: Optional[int] = None,
        limit: Optional[int] = None,
        session: Optional[AsyncClientSession] = None,
        ignore_cache: bool = False,
        fetch_links: bool = False,
        with_children: bool = False,
        lazy_parse: bool = False,
        nesting_depth: Optional[int] = None,
        nesting_depths_per_field: Optional[Dict[str, int]] = None,
        **kwargs
    ) -> "FindInterface":
        """Find documents matching the filter."""
        ...
    
    @classmethod
    def find_all(
        cls,
        skip: Optional[int] = None,
        limit: Optional[int] = None,
        session: Optional[AsyncClientSession] = None,
        ignore_cache: bool = False,
        fetch_links: bool = False,
        with_children: bool = False,
        lazy_parse: bool = False,
        **kwargs
    ) -> "FindInterface":
        """Find all documents in the collection."""
        ...
    
    @classmethod
    def aggregate(
        cls,
        pipeline: List[Dict],
        projection_model: Optional[Type[BaseModel]] = None,
        session: Optional[AsyncClientSession] = None,
        ignore_cache: bool = False,
        **kwargs
    ) -> "AggregateInterface":
        """Run an aggregation pipeline."""
        ...
    
    @classmethod
    async def count_documents(
        cls,
        filter_query: Optional[Dict] = None,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> int:
        """Count documents matching the filter."""
        ...
    
    @classmethod
    async def distinct(
        cls,
        key: str,
        filter_query: Optional[Dict] = None,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> List[Any]:
        """Get distinct values for a field."""
        ...
    
    # Class methods for bulk operations
    @classmethod
    def update_all(
        cls,
        update_query: Dict[str, Any],
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> UpdateMany:
        """Update all documents matching the filter."""
        ...
    
    @classmethod
    async def delete_all(
        cls,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> DeleteResult:
        """Delete all documents in the collection."""
        ...
    
    @classmethod
    def bulk_writer(
        cls,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> BulkWriter:
        """Get a bulk writer for this document class."""
        ...
    
    # State management methods
    @classmethod
    def use_state_management(cls) -> bool:
        """Check if state management is enabled for this document class."""
        ...
    
    @classmethod
    def state_management_save_previous(cls) -> bool:
        """Check if previous state saving is enabled."""
        ...
    
    @classmethod  
    def state_management_replace_objects(cls) -> bool:
        """Check if object replacement is enabled in state management."""
        ...
    
    def get_saved_state(self) -> Optional[Dict[str, Any]]:
        """Get the saved state of this document."""
        ...
    
    def get_previous_saved_state(self) -> Optional[Dict[str, Any]]:
        """Get the previous saved state of this document."""
        ...
    
    @property
    def is_changed(self) -> bool:
        """Check if document has any unsaved changes."""
        ...
    
    @property
    def has_changed(self) -> bool:
        """Check if document has changed since last save."""
        ...
    
    def get_changes(self) -> Dict[str, Any]:
        """Get a dictionary of all changes made to this document."""
        ...
    
    def get_previous_changes(self) -> Dict[str, Any]:
        """Get the previous changes made to this document."""
        ...
    
    def rollback(self) -> None:
        """Rollback document to its saved state."""
        ...
    
    # Link management methods
    async def fetch_link(
        self,
        item: Union[Link, BackLink, str],
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> None:
        """Fetch a specific link field."""
        ...
    
    async def fetch_all_links(
        self,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> None:
        """Fetch all link fields in this document."""
        ...
    
    def to_ref(self) -> DBRef:
        """Convert this document to a MongoDB DBRef."""
        ...
    
    @classmethod
    def link_from_id(cls, object_id: PydanticObjectId) -> Link:
        """Create a Link object from an ObjectId."""
        ...
    
    # Collection management
    @classmethod
    async def inspect_collection(
        cls,
        session: Optional[AsyncClientSession] = None
    ) -> InspectionResult:
        """Inspect the collection for validation and integrity."""
        ...
    
    @classmethod
    def get_settings(cls) -> DocumentSettings:
        """Get the settings configuration for this document class."""
        ...
    
    # Update operators (convenience methods)
    def set(self, expression: Dict[Union[ExpressionField, str], Any]) -> SetOperator:
        """Create a Set update operator."""
        ...
    
    def current_date(
        self,
        expression: Dict[Union[ExpressionField, str], Optional[str]]
    ) -> CurrentDate:
        """Create a CurrentDate update operator."""
        ...
    
    def inc(self, expression: Dict[Union[ExpressionField, str], Union[int, float]]) -> Inc:
        """Create an Inc update operator."""
        ...
    
    # Settings configuration
    class Settings:
        collection: Optional[str] = None
        name: Optional[str] = None
        indexes: Optional[List] = None
        use_state_management: bool = False
        validate_on_save: bool = False
        use_revision_id: bool = False
        use_enum_values: bool = False
        bson_encoders: Optional[Dict] = None
        lazy_parsing: bool = False
        is_root: bool = False
        union_doc: Optional[Type] = None
        inheritance: bool = False
        timeseries: Optional[TimeSeriesConfig] = None

Usage Examples

# Define a document model
class User(Document):
    name: str
    email: str
    age: int = 0
    active: bool = True
    
    class Settings:
        collection = "users"
        use_state_management = True

# Create and insert
user = User(name="Alice", email="alice@example.com", age=25)
await user.insert()

# Find operations
user = await User.find_one(User.email == "alice@example.com")
users = await User.find(User.age > 18).to_list()
active_users = await User.find(User.active == True).to_list()

# Update operations
await user.update({"$set": {"age": 26}})
await user.update({"$inc": {"age": 1}})

# Replace and delete
user.age = 30
await user.replace()
await user.delete()

DocumentWithSoftDelete

Document class that implements soft delete functionality, marking documents as deleted rather than removing them permanently.

class DocumentWithSoftDelete(Document):
    deleted_at: Optional[datetime] = None
    
    async def delete(self, **kwargs) -> None:
        """Soft delete by setting deleted_at timestamp."""
        ...
    
    async def hard_delete(self, **kwargs) -> None:
        """Permanently delete the document from the collection."""
        ...
    
    def is_deleted(self) -> bool:
        """Check if document is soft deleted."""
        ...
    
    @classmethod
    def find_many_in_all(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
        """Find documents including soft deleted ones."""
        ...
    
    @classmethod
    def get_deleted(cls, **kwargs) -> "FindInterface":
        """Get only soft deleted documents."""
        ...

Usage Examples

class SoftUser(DocumentWithSoftDelete):
    name: str
    email: str
    
    class Settings:
        collection = "soft_users"

user = SoftUser(name="Bob", email="bob@example.com")
await user.insert()

# Soft delete - sets deleted_at timestamp
await user.delete()
print(user.is_deleted())  # True

# Find excluding deleted
active_users = await SoftUser.find_all().to_list()

# Find including deleted
all_users = await SoftUser.find_many_in_all().to_list()

# Permanently delete
await user.hard_delete()

View

Read-only mapping class for MongoDB views that provides query functionality without modification operations.

class View(BaseModel):
    id: Optional[PydanticObjectId] = None
    
    @classmethod
    async def find_one(cls, filter_query: Optional[Dict] = None, **kwargs) -> Optional["View"]:
        """Find a single document in the view."""
        ...
    
    @classmethod
    def find(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
        """Find documents in the view."""
        ...
    
    @classmethod
    def find_all(cls, **kwargs) -> "FindInterface":
        """Find all documents in the view."""
        ...
    
    @classmethod
    def aggregate(cls, pipeline: List[Dict], **kwargs) -> "AggregateInterface":
        """Run an aggregation pipeline on the view."""
        ...
    
    @classmethod
    async def count_documents(cls, filter_query: Optional[Dict] = None) -> int:
        """Count documents in the view."""
        ...
    
    # Settings configuration
    class Settings:
        source: Type[Document]
        pipeline: List[Dict]
        name: Optional[str] = None

Usage Examples

# Define a view based on User documents
class ActiveUserView(View):
    name: str
    email: str
    age: int
    
    class Settings:
        source = User
        pipeline = [
            {"$match": {"active": True}},
            {"$project": {"name": 1, "email": 1, "age": 1}}
        ]

# Query the view (read-only)
active_users = await ActiveUserView.find_all().to_list()
young_users = await ActiveUserView.find(ActiveUserView.age < 30).to_list()

UnionDoc

Base class for handling multiple document types within a single collection, enabling polymorphic document storage.

class UnionDoc:
    @classmethod
    def register_doc(cls, name: str, doc_model: Type[Document]) -> None:
        """Register a document model with this union using a specific name."""
        ...
    
    @classmethod
    def find(
        cls, 
        filter_query: Optional[Dict] = None,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> "FindInterface":
        """Find documents across all registered types."""
        ...
    
    @classmethod
    def find_all(
        cls,
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> "FindInterface":
        """Find all documents across all registered types."""
        ...
    
    @classmethod
    def aggregate(
        cls, 
        pipeline: List[Dict],
        session: Optional[AsyncClientSession] = None,
        **kwargs
    ) -> "AggregateInterface":
        """Run aggregation across all registered types."""
        ...
    
    @classmethod
    def bulk_writer(
        cls,
        session: Optional[AsyncClientSession] = None,
        ordered: bool = True,
        bypass_document_validation: bool = False,
        comment: Optional[str] = None,
    ) -> BulkWriter:
        """Get a bulk writer for union operations."""
        ...
    
    # Settings configuration
    class Settings:
        name: Optional[str] = None
        class_id: str = "_class_id"

Usage Examples

# Define different document types for the same collection
class Animal(Document):
    name: str
    species: str
    
    class Settings:
        collection = "animals"
        is_root = True

class Dog(Animal):
    breed: str
    good_boy: bool = True

class Cat(Animal):
    lives_remaining: int = 9
    attitude: str = "aloof"

# Create union for polymorphic operations
class AnimalUnion(UnionDoc):
    class Settings:
        name = "animals"

# Register document types with names
AnimalUnion.register_doc("Dog", Dog)
AnimalUnion.register_doc("Cat", Cat)

# Query across all types
all_animals = await AnimalUnion.find_all().to_list()
pets_named_buddy = await AnimalUnion.find(Animal.name == "Buddy").to_list()

Types

from datetime import datetime
from typing import Optional, Dict, List, Type, Union, Any
from uuid import UUID
from bson import ObjectId

# Document lifecycle merge strategies
class MergeStrategy(Enum):
    local = "local"    # Prefer local changes
    remote = "remote"  # Prefer database state

# Base document type
DocumentType = TypeVar("DocumentType", bound=Document)

# Query interface types
FindInterface = TypeVar("FindInterface")
AggregateInterface = TypeVar("AggregateInterface")

Install with Tessl CLI

npx tessl i tessl/pypi-beanie

docs

bulk-operations.md

custom-types.md

documents.md

events-actions.md

fields-types.md

index.md

initialization.md

migrations.md

query-operations.md

time-series.md

tile.json