Asynchronous Python ODM for MongoDB with modern Pydantic-based document mapping
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pre/post event hooks for document lifecycle events with action type constants, enabling flexible business logic integration and data validation workflows.
Decorators that register functions to run before or after specific document lifecycle events.
def before_event(*actions):
"""
Decorator to register functions that run before document events.
Args:
*actions: Event action types (Insert, Update, Save, Delete, etc.)
Returns:
Decorator function for event handlers
"""
...
def after_event(*actions):
"""
Decorator to register functions that run after document events.
Args:
*actions: Event action types (Insert, Update, Save, Delete, etc.)
Returns:
Decorator function for event handlers
"""
...Event type constants that specify which document lifecycle events to handle.
class Insert:
"""Document insertion event type."""
...
class Replace:
"""Document replacement event type."""
...
class Save:
"""Document save event type (insert or update)."""
...
class SaveChanges:
"""Document save changes event type."""
...
class ValidateOnSave:
"""Document validation on save event type."""
...
class Delete:
"""Document deletion event type."""
...
class Update:
"""Document update event type."""
...Constants that specify timing of event execution relative to the document operation.
class Before:
"""Before event timing constant."""
...
class After:
"""After event timing constant."""
...from beanie import Document, before_event, after_event
from beanie import Insert, Update, Save, Delete
from datetime import datetime
import logging
class User(Document):
name: str
email: str
created_at: datetime = None
updated_at: datetime = None
class Settings:
collection = "users"
# Before event handlers
@before_event(Insert)
async def set_creation_time(self):
"""Set creation timestamp before insert."""
self.created_at = datetime.utcnow()
self.updated_at = datetime.utcnow()
@before_event(Update, Save)
async def set_update_time(self):
"""Set update timestamp before modifications."""
self.updated_at = datetime.utcnow()
@before_event(Delete)
async def log_deletion(self):
"""Log before deleting document."""
logging.info(f"Deleting user: {self.name} ({self.email})")
# After event handlers
@after_event(Insert)
async def welcome_new_user(self):
"""Send welcome email after inserting user."""
await send_welcome_email(self.email)
logging.info(f"New user registered: {self.name}")
@after_event(Update)
async def audit_changes(self):
"""Audit user changes after update."""
await log_user_changes(self.id, self.get_changes())
@after_event(Delete)
async def cleanup_user_data(self):
"""Clean up related data after user deletion."""
await cleanup_user_posts(self.id)
await cleanup_user_sessions(self.id)from beanie import Document, before_event, after_event
from beanie import Insert, Update, Save, Delete, Replace
class Product(Document):
name: str
price: float
stock: int
active: bool = True
class Settings:
collection = "products"
# Handle multiple event types
@before_event(Insert, Update, Replace, Save)
async def validate_product(self):
"""Validate product data before any write operation."""
if self.price < 0:
raise ValueError("Price cannot be negative")
if self.stock < 0:
raise ValueError("Stock cannot be negative")
@after_event(Update, Save)
async def update_search_index(self):
"""Update search index after product changes."""
if self.active:
await add_to_search_index(self)
else:
await remove_from_search_index(self.id)
@after_event(Insert, Update, Replace)
async def invalidate_cache(self):
"""Invalidate product cache after modifications."""
await cache.delete(f"product:{self.id}")
await cache.delete("products:all")from beanie import Document, before_event, after_event
from beanie import Update, Save
import asyncio
class Order(Document):
status: str
total: float
customer_email: str
items: List[Dict] = []
class Settings:
collection = "orders"
@before_event(Update, Save)
async def handle_status_change(self):
"""Handle order status changes."""
# Get previous state if available
if hasattr(self, '_previous_status'):
old_status = self._previous_status
new_status = self.status
# Only act on status changes
if old_status != new_status:
if new_status == "shipped":
await self.send_shipping_notification()
elif new_status == "delivered":
await self.send_delivery_confirmation()
elif new_status == "cancelled":
await self.restore_inventory()
async def send_shipping_notification(self):
"""Send shipping notification email."""
await send_email(
to=self.customer_email,
subject="Your order has shipped!",
body=f"Order #{self.id} is on its way."
)
async def send_delivery_confirmation(self):
"""Send delivery confirmation."""
await send_email(
to=self.customer_email,
subject="Order delivered",
body=f"Order #{self.id} has been delivered."
)
async def restore_inventory(self):
"""Restore inventory for cancelled order."""
for item in self.items:
await restore_item_stock(item['product_id'], item['quantity'])# Register event handlers on document classes
class BlogPost(Document):
title: str
content: str
author_id: str
published: bool = False
view_count: int = 0
class Settings:
collection = "posts"
# Method-based event handlers
@before_event(Insert)
async def generate_slug(self):
"""Generate URL slug before insert."""
self.slug = self.title.lower().replace(" ", "-")
@after_event(Update)
async def notify_subscribers(self):
"""Notify subscribers when post is published."""
if self.published and hasattr(self, '_was_draft'):
subscribers = await get_subscribers(self.author_id)
await notify_post_published(subscribers, self)
# External event handlers
@before_event(Delete)
async def archive_post_content(doc: BlogPost):
"""Archive post content before deletion."""
await archive_content(doc.id, doc.content)
@after_event(Insert, Update)
async def update_author_stats(doc: BlogPost):
"""Update author statistics after post changes."""
await update_author_post_count(doc.author_id)from typing import Callable, Any, Awaitable
from enum import Enum
# Event handler type
EventHandler = Callable[[Any], Awaitable[None]]
# Action type enumeration
class ActionType(Enum):
INSERT = "INSERT"
UPDATE = "UPDATE"
REPLACE = "REPLACE"
SAVE = "SAVE"
SAVE_CHANGES = "SAVE_CHANGES"
VALIDATE_ON_SAVE = "VALIDATE_ON_SAVE"
DELETE = "DELETE"
# Direction type enumeration
class ActionDirection(Enum):
BEFORE = "BEFORE"
AFTER = "AFTER"Install with Tessl CLI
npx tessl i tessl/pypi-beanie