0
# Document Models
1
2
Core document mapping classes that provide the foundation for MongoDB collection interactions in Beanie. These classes extend Pydantic models with MongoDB-specific functionality, offering CRUD operations, query interfaces, and lifecycle management.
3
4
## Capabilities
5
6
### Document
7
8
The primary document mapping class that represents a MongoDB document with full CRUD operations and query capabilities.
9
10
```python { .api }
11
class Document(BaseModel):
12
id: Optional[PydanticObjectId] = None
13
revision_id: Optional[UUID] = None
14
15
# Instance methods for CRUD operations
16
async def insert(
17
self,
18
*,
19
session: Optional[AsyncClientSession] = None,
20
link_rule: WriteRules = WriteRules.DO_NOTHING,
21
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
22
**kwargs
23
) -> "Document":
24
"""Insert this document into the collection."""
25
...
26
27
async def create(
28
self,
29
session: Optional[AsyncClientSession] = None,
30
**kwargs
31
) -> "Document":
32
"""Alias for insert() method."""
33
...
34
35
async def save(
36
self,
37
session: Optional[AsyncClientSession] = None,
38
link_rule: WriteRules = WriteRules.DO_NOTHING,
39
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
40
**kwargs
41
) -> "Document":
42
"""Insert or update this document."""
43
...
44
45
async def save_changes(
46
self,
47
ignore_revision: bool = False,
48
session: Optional[AsyncClientSession] = None,
49
bulk_writer: Optional[BulkWriter] = None,
50
link_rule: WriteRules = WriteRules.DO_NOTHING,
51
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
52
) -> Optional["Document"]:
53
"""Save only the changes made to this document."""
54
...
55
56
async def update(
57
self,
58
*args: Union[Dict[Any, Any], Mapping[Any, Any]],
59
ignore_revision: bool = False,
60
session: Optional[AsyncClientSession] = None,
61
bulk_writer: Optional[BulkWriter] = None,
62
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
63
skip_sync: bool = False,
64
**kwargs
65
) -> None:
66
"""Update this document with the given update query."""
67
...
68
69
async def replace(
70
self,
71
ignore_revision: bool = False,
72
session: Optional[AsyncClientSession] = None,
73
bulk_writer: Optional[BulkWriter] = None,
74
link_rule: WriteRules = WriteRules.DO_NOTHING,
75
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
76
**kwargs
77
) -> "Document":
78
"""Replace this document entirely in the collection."""
79
...
80
81
async def delete(
82
self,
83
session: Optional[AsyncClientSession] = None,
84
bulk_writer: Optional[BulkWriter] = None,
85
link_rule: DeleteRules = DeleteRules.DO_NOTHING,
86
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
87
**kwargs
88
) -> None:
89
"""Delete this document from the collection."""
90
...
91
92
async def sync(self, merge_strategy: MergeStrategy = MergeStrategy.remote) -> None:
93
"""Sync document with database using specified merge strategy."""
94
...
95
96
# Class methods for single document operations
97
@classmethod
98
async def get(
99
cls,
100
document_id: Any,
101
session: Optional[AsyncClientSession] = None,
102
ignore_cache: bool = False,
103
fetch_links: bool = False,
104
with_children: bool = False,
105
nesting_depth: Optional[int] = None,
106
nesting_depths_per_field: Optional[Dict[str, int]] = None,
107
**kwargs
108
) -> Optional["Document"]:
109
"""Get a document by its ID."""
110
...
111
112
@classmethod
113
async def insert_one(
114
cls,
115
document: "Document",
116
session: Optional[AsyncClientSession] = None,
117
**kwargs
118
) -> "Document":
119
"""Insert a single document."""
120
...
121
122
@classmethod
123
async def insert_many(
124
cls,
125
documents: Iterable["Document"],
126
session: Optional[AsyncClientSession] = None,
127
link_rule: WriteRules = WriteRules.DO_NOTHING,
128
skip_actions: Optional[List[Union[ActionDirections, str]]] = None,
129
**kwargs
130
) -> InsertManyResult:
131
"""Insert multiple documents."""
132
...
133
134
@classmethod
135
async def replace_many(
136
cls,
137
documents: List["Document"],
138
session: Optional[AsyncClientSession] = None,
139
**kwargs
140
) -> None:
141
"""Replace multiple documents."""
142
...
143
144
# Class methods for querying
145
@classmethod
146
async def find_one(
147
cls,
148
filter_query: Optional[Union[Dict, bool]] = None,
149
session: Optional[AsyncClientSession] = None,
150
ignore_cache: bool = False,
151
fetch_links: bool = False,
152
with_children: bool = False,
153
nesting_depth: Optional[int] = None,
154
nesting_depths_per_field: Optional[Dict[str, int]] = None,
155
**kwargs
156
) -> Optional["Document"]:
157
"""Find a single document matching the filter."""
158
...
159
160
@classmethod
161
def find(
162
cls,
163
filter_query: Optional[Union[Dict, bool]] = None,
164
skip: Optional[int] = None,
165
limit: Optional[int] = None,
166
session: Optional[AsyncClientSession] = None,
167
ignore_cache: bool = False,
168
fetch_links: bool = False,
169
with_children: bool = False,
170
lazy_parse: bool = False,
171
nesting_depth: Optional[int] = None,
172
nesting_depths_per_field: Optional[Dict[str, int]] = None,
173
**kwargs
174
) -> "FindInterface":
175
"""Find documents matching the filter."""
176
...
177
178
@classmethod
179
def find_all(
180
cls,
181
skip: Optional[int] = None,
182
limit: Optional[int] = None,
183
session: Optional[AsyncClientSession] = None,
184
ignore_cache: bool = False,
185
fetch_links: bool = False,
186
with_children: bool = False,
187
lazy_parse: bool = False,
188
**kwargs
189
) -> "FindInterface":
190
"""Find all documents in the collection."""
191
...
192
193
@classmethod
194
def aggregate(
195
cls,
196
pipeline: List[Dict],
197
projection_model: Optional[Type[BaseModel]] = None,
198
session: Optional[AsyncClientSession] = None,
199
ignore_cache: bool = False,
200
**kwargs
201
) -> "AggregateInterface":
202
"""Run an aggregation pipeline."""
203
...
204
205
@classmethod
206
async def count_documents(
207
cls,
208
filter_query: Optional[Dict] = None,
209
session: Optional[AsyncClientSession] = None,
210
**kwargs
211
) -> int:
212
"""Count documents matching the filter."""
213
...
214
215
@classmethod
216
async def distinct(
217
cls,
218
key: str,
219
filter_query: Optional[Dict] = None,
220
session: Optional[AsyncClientSession] = None,
221
**kwargs
222
) -> List[Any]:
223
"""Get distinct values for a field."""
224
...
225
226
# Class methods for bulk operations
227
@classmethod
228
def update_all(
229
cls,
230
update_query: Dict[str, Any],
231
session: Optional[AsyncClientSession] = None,
232
**kwargs
233
) -> UpdateMany:
234
"""Update all documents matching the filter."""
235
...
236
237
@classmethod
238
async def delete_all(
239
cls,
240
session: Optional[AsyncClientSession] = None,
241
**kwargs
242
) -> DeleteResult:
243
"""Delete all documents in the collection."""
244
...
245
246
@classmethod
247
def bulk_writer(
248
cls,
249
session: Optional[AsyncClientSession] = None,
250
**kwargs
251
) -> BulkWriter:
252
"""Get a bulk writer for this document class."""
253
...
254
255
# State management methods
256
@classmethod
257
def use_state_management(cls) -> bool:
258
"""Check if state management is enabled for this document class."""
259
...
260
261
@classmethod
262
def state_management_save_previous(cls) -> bool:
263
"""Check if previous state saving is enabled."""
264
...
265
266
@classmethod
267
def state_management_replace_objects(cls) -> bool:
268
"""Check if object replacement is enabled in state management."""
269
...
270
271
def get_saved_state(self) -> Optional[Dict[str, Any]]:
272
"""Get the saved state of this document."""
273
...
274
275
def get_previous_saved_state(self) -> Optional[Dict[str, Any]]:
276
"""Get the previous saved state of this document."""
277
...
278
279
@property
280
def is_changed(self) -> bool:
281
"""Check if document has any unsaved changes."""
282
...
283
284
@property
285
def has_changed(self) -> bool:
286
"""Check if document has changed since last save."""
287
...
288
289
def get_changes(self) -> Dict[str, Any]:
290
"""Get a dictionary of all changes made to this document."""
291
...
292
293
def get_previous_changes(self) -> Dict[str, Any]:
294
"""Get the previous changes made to this document."""
295
...
296
297
def rollback(self) -> None:
298
"""Rollback document to its saved state."""
299
...
300
301
# Link management methods
302
async def fetch_link(
303
self,
304
item: Union[Link, BackLink, str],
305
session: Optional[AsyncClientSession] = None,
306
**kwargs
307
) -> None:
308
"""Fetch a specific link field."""
309
...
310
311
async def fetch_all_links(
312
self,
313
session: Optional[AsyncClientSession] = None,
314
**kwargs
315
) -> None:
316
"""Fetch all link fields in this document."""
317
...
318
319
def to_ref(self) -> DBRef:
320
"""Convert this document to a MongoDB DBRef."""
321
...
322
323
@classmethod
324
def link_from_id(cls, object_id: PydanticObjectId) -> Link:
325
"""Create a Link object from an ObjectId."""
326
...
327
328
# Collection management
329
@classmethod
330
async def inspect_collection(
331
cls,
332
session: Optional[AsyncClientSession] = None
333
) -> InspectionResult:
334
"""Inspect the collection for validation and integrity."""
335
...
336
337
@classmethod
338
def get_settings(cls) -> DocumentSettings:
339
"""Get the settings configuration for this document class."""
340
...
341
342
# Update operators (convenience methods)
343
def set(self, expression: Dict[Union[ExpressionField, str], Any]) -> SetOperator:
344
"""Create a Set update operator."""
345
...
346
347
def current_date(
348
self,
349
expression: Dict[Union[ExpressionField, str], Optional[str]]
350
) -> CurrentDate:
351
"""Create a CurrentDate update operator."""
352
...
353
354
def inc(self, expression: Dict[Union[ExpressionField, str], Union[int, float]]) -> Inc:
355
"""Create an Inc update operator."""
356
...
357
358
# Settings configuration
359
class Settings:
360
collection: Optional[str] = None
361
name: Optional[str] = None
362
indexes: Optional[List] = None
363
use_state_management: bool = False
364
validate_on_save: bool = False
365
use_revision_id: bool = False
366
use_enum_values: bool = False
367
bson_encoders: Optional[Dict] = None
368
lazy_parsing: bool = False
369
is_root: bool = False
370
union_doc: Optional[Type] = None
371
inheritance: bool = False
372
timeseries: Optional[TimeSeriesConfig] = None
373
```
374
375
#### Usage Examples
376
377
```python
378
# Define a document model
379
class User(Document):
380
name: str
381
email: str
382
age: int = 0
383
active: bool = True
384
385
class Settings:
386
collection = "users"
387
use_state_management = True
388
389
# Create and insert
390
user = User(name="Alice", email="alice@example.com", age=25)
391
await user.insert()
392
393
# Find operations
394
user = await User.find_one(User.email == "alice@example.com")
395
users = await User.find(User.age > 18).to_list()
396
active_users = await User.find(User.active == True).to_list()
397
398
# Update operations
399
await user.update({"$set": {"age": 26}})
400
await user.update({"$inc": {"age": 1}})
401
402
# Replace and delete
403
user.age = 30
404
await user.replace()
405
await user.delete()
406
```
407
408
### DocumentWithSoftDelete
409
410
Document class that implements soft delete functionality, marking documents as deleted rather than removing them permanently.
411
412
```python { .api }
413
class DocumentWithSoftDelete(Document):
414
deleted_at: Optional[datetime] = None
415
416
async def delete(self, **kwargs) -> None:
417
"""Soft delete by setting deleted_at timestamp."""
418
...
419
420
async def hard_delete(self, **kwargs) -> None:
421
"""Permanently delete the document from the collection."""
422
...
423
424
def is_deleted(self) -> bool:
425
"""Check if document is soft deleted."""
426
...
427
428
@classmethod
429
def find_many_in_all(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
430
"""Find documents including soft deleted ones."""
431
...
432
433
@classmethod
434
def get_deleted(cls, **kwargs) -> "FindInterface":
435
"""Get only soft deleted documents."""
436
...
437
```
438
439
#### Usage Examples
440
441
```python
442
class SoftUser(DocumentWithSoftDelete):
443
name: str
444
email: str
445
446
class Settings:
447
collection = "soft_users"
448
449
user = SoftUser(name="Bob", email="bob@example.com")
450
await user.insert()
451
452
# Soft delete - sets deleted_at timestamp
453
await user.delete()
454
print(user.is_deleted()) # True
455
456
# Find excluding deleted
457
active_users = await SoftUser.find_all().to_list()
458
459
# Find including deleted
460
all_users = await SoftUser.find_many_in_all().to_list()
461
462
# Permanently delete
463
await user.hard_delete()
464
```
465
466
### View
467
468
Read-only mapping class for MongoDB views that provides query functionality without modification operations.
469
470
```python { .api }
471
class View(BaseModel):
472
id: Optional[PydanticObjectId] = None
473
474
@classmethod
475
async def find_one(cls, filter_query: Optional[Dict] = None, **kwargs) -> Optional["View"]:
476
"""Find a single document in the view."""
477
...
478
479
@classmethod
480
def find(cls, filter_query: Optional[Dict] = None, **kwargs) -> "FindInterface":
481
"""Find documents in the view."""
482
...
483
484
@classmethod
485
def find_all(cls, **kwargs) -> "FindInterface":
486
"""Find all documents in the view."""
487
...
488
489
@classmethod
490
def aggregate(cls, pipeline: List[Dict], **kwargs) -> "AggregateInterface":
491
"""Run an aggregation pipeline on the view."""
492
...
493
494
@classmethod
495
async def count_documents(cls, filter_query: Optional[Dict] = None) -> int:
496
"""Count documents in the view."""
497
...
498
499
# Settings configuration
500
class Settings:
501
source: Type[Document]
502
pipeline: List[Dict]
503
name: Optional[str] = None
504
```
505
506
#### Usage Examples
507
508
```python
509
# Define a view based on User documents
510
class ActiveUserView(View):
511
name: str
512
email: str
513
age: int
514
515
class Settings:
516
source = User
517
pipeline = [
518
{"$match": {"active": True}},
519
{"$project": {"name": 1, "email": 1, "age": 1}}
520
]
521
522
# Query the view (read-only)
523
active_users = await ActiveUserView.find_all().to_list()
524
young_users = await ActiveUserView.find(ActiveUserView.age < 30).to_list()
525
```
526
527
### UnionDoc
528
529
Base class for handling multiple document types within a single collection, enabling polymorphic document storage.
530
531
```python { .api }
532
class UnionDoc:
533
@classmethod
534
def register_doc(cls, name: str, doc_model: Type[Document]) -> None:
535
"""Register a document model with this union using a specific name."""
536
...
537
538
@classmethod
539
def find(
540
cls,
541
filter_query: Optional[Dict] = None,
542
session: Optional[AsyncClientSession] = None,
543
**kwargs
544
) -> "FindInterface":
545
"""Find documents across all registered types."""
546
...
547
548
@classmethod
549
def find_all(
550
cls,
551
session: Optional[AsyncClientSession] = None,
552
**kwargs
553
) -> "FindInterface":
554
"""Find all documents across all registered types."""
555
...
556
557
@classmethod
558
def aggregate(
559
cls,
560
pipeline: List[Dict],
561
session: Optional[AsyncClientSession] = None,
562
**kwargs
563
) -> "AggregateInterface":
564
"""Run aggregation across all registered types."""
565
...
566
567
@classmethod
568
def bulk_writer(
569
cls,
570
session: Optional[AsyncClientSession] = None,
571
ordered: bool = True,
572
bypass_document_validation: bool = False,
573
comment: Optional[str] = None,
574
) -> BulkWriter:
575
"""Get a bulk writer for union operations."""
576
...
577
578
# Settings configuration
579
class Settings:
580
name: Optional[str] = None
581
class_id: str = "_class_id"
582
```
583
584
#### Usage Examples
585
586
```python
587
# Define different document types for the same collection
588
class Animal(Document):
589
name: str
590
species: str
591
592
class Settings:
593
collection = "animals"
594
is_root = True
595
596
class Dog(Animal):
597
breed: str
598
good_boy: bool = True
599
600
class Cat(Animal):
601
lives_remaining: int = 9
602
attitude: str = "aloof"
603
604
# Create union for polymorphic operations
605
class AnimalUnion(UnionDoc):
606
class Settings:
607
name = "animals"
608
609
# Register document types with names
610
AnimalUnion.register_doc("Dog", Dog)
611
AnimalUnion.register_doc("Cat", Cat)
612
613
# Query across all types
614
all_animals = await AnimalUnion.find_all().to_list()
615
pets_named_buddy = await AnimalUnion.find(Animal.name == "Buddy").to_list()
616
```
617
618
## Types
619
620
```python { .api }
621
from datetime import datetime
622
from typing import Optional, Dict, List, Type, Union, Any
623
from uuid import UUID
624
from bson import ObjectId
625
626
# Document lifecycle merge strategies
627
class MergeStrategy(Enum):
628
local = "local" # Prefer local changes
629
remote = "remote" # Prefer database state
630
631
# Base document type
632
DocumentType = TypeVar("DocumentType", bound=Document)
633
634
# Query interface types
635
FindInterface = TypeVar("FindInterface")
636
AggregateInterface = TypeVar("AggregateInterface")
637
```