0
# Bulk Operations and Transactions
1
2
Bulk write operations, transaction support, and session management for high-performance and ACID-compliant operations.
3
4
## Capabilities
5
6
### Bulk Write Operations
7
8
Perform multiple write operations efficiently in a single request.
9
10
```python { .api }
11
class Collection:
12
def bulk_write(self, requests, ordered=True, bypass_document_validation=False, session=None):
13
"""
14
Execute bulk write operations.
15
16
Parameters:
17
- requests: list of write operation instances
18
- ordered: stop on first error if True
19
- bypass_document_validation: skip document validation
20
- session: optional ClientSession
21
22
Returns:
23
BulkWriteResult: Result summary
24
"""
25
26
def initialize_ordered_bulk_op(self):
27
"""
28
Initialize ordered bulk operation (deprecated).
29
30
Returns:
31
BulkOperationBuilder: Bulk operation builder
32
"""
33
34
def initialize_unordered_bulk_op(self):
35
"""
36
Initialize unordered bulk operation (deprecated).
37
38
Returns:
39
BulkOperationBuilder: Bulk operation builder
40
"""
41
```
42
43
### Bulk Operation Types
44
45
Individual operation types for bulk writes.
46
47
```python { .api }
48
class InsertOne:
49
def __init__(self, document):
50
"""
51
Single document insert operation.
52
53
Parameters:
54
- document: document to insert
55
"""
56
57
class UpdateOne:
58
def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
59
"""
60
Single document update operation.
61
62
Parameters:
63
- filter: query criteria
64
- update: update specification
65
- upsert: insert if no match found
66
- collation: collation options
67
- array_filters: array update filters
68
- hint: index hint
69
"""
70
71
class UpdateMany:
72
def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
73
"""
74
Multiple document update operation.
75
76
Parameters:
77
- filter: query criteria
78
- update: update specification
79
- upsert: insert if no match found
80
- collation: collation options
81
- array_filters: array update filters
82
- hint: index hint
83
"""
84
85
class ReplaceOne:
86
def __init__(self, filter, replacement, upsert=False, collation=None, hint=None):
87
"""
88
Single document replacement operation.
89
90
Parameters:
91
- filter: query criteria
92
- replacement: replacement document
93
- upsert: insert if no match found
94
- collation: collation options
95
- hint: index hint
96
"""
97
98
class DeleteOne:
99
def __init__(self, filter, collation=None, hint=None):
100
"""
101
Single document delete operation.
102
103
Parameters:
104
- filter: query criteria
105
- collation: collation options
106
- hint: index hint
107
"""
108
109
class DeleteMany:
110
def __init__(self, filter, collation=None, hint=None):
111
"""
112
Multiple document delete operation.
113
114
Parameters:
115
- filter: query criteria
116
- collation: collation options
117
- hint: index hint
118
"""
119
120
class IndexModel:
121
def __init__(self, keys, **kwargs):
122
"""
123
Index creation specification.
124
125
Parameters:
126
- keys: index key specification
127
- name: index name
128
- unique: unique constraint
129
- background: background build (deprecated)
130
- sparse: sparse index
131
- expireAfterSeconds: TTL seconds
132
- partialFilterExpression: partial index filter
133
- collation: collation options
134
"""
135
```
136
137
### Bulk Write Results
138
139
Result objects for bulk operations.
140
141
```python { .api }
142
class BulkWriteResult:
143
@property
144
def acknowledged(self):
145
"""
146
Write acknowledgment status.
147
148
Returns:
149
bool: True if acknowledged
150
"""
151
152
@property
153
def inserted_count(self):
154
"""
155
Number of inserted documents.
156
157
Returns:
158
int: Insert count
159
"""
160
161
@property
162
def matched_count(self):
163
"""
164
Number of matched documents for updates.
165
166
Returns:
167
int: Match count
168
"""
169
170
@property
171
def modified_count(self):
172
"""
173
Number of modified documents.
174
175
Returns:
176
int: Modification count
177
"""
178
179
@property
180
def deleted_count(self):
181
"""
182
Number of deleted documents.
183
184
Returns:
185
int: Delete count
186
"""
187
188
@property
189
def upserted_count(self):
190
"""
191
Number of upserted documents.
192
193
Returns:
194
int: Upsert count
195
"""
196
197
@property
198
def upserted_ids(self):
199
"""
200
Mapping of operation index to upserted _id.
201
202
Returns:
203
dict: {index: _id} mapping
204
"""
205
206
class BulkWriteError(OperationFailure):
207
@property
208
def details(self):
209
"""
210
Detailed error information.
211
212
Returns:
213
dict: Error details with writeErrors and writeConcernErrors
214
"""
215
```
216
217
### Transaction Support
218
219
Multi-document ACID transactions with session management.
220
221
```python { .api }
222
class MongoClient:
223
def start_session(
224
self,
225
causal_consistency=None,
226
default_transaction_options=None,
227
snapshot=False
228
):
229
"""
230
Start a client session.
231
232
Parameters:
233
- causal_consistency: enable causal consistency
234
- default_transaction_options: default TransactionOptions
235
- snapshot: enable snapshot reads
236
237
Returns:
238
ClientSession: Session for operations
239
"""
240
241
class ClientSession:
242
def start_transaction(self, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
243
"""
244
Start a multi-document transaction.
245
246
Parameters:
247
- read_concern: transaction read concern
248
- write_concern: transaction write concern
249
- read_preference: transaction read preference
250
- max_commit_time_ms: maximum commit time
251
252
Raises:
253
InvalidOperation: if transaction already started
254
"""
255
256
def commit_transaction(self):
257
"""
258
Commit the current transaction.
259
260
Raises:
261
InvalidOperation: if no active transaction
262
"""
263
264
def abort_transaction(self):
265
"""
266
Abort the current transaction.
267
268
Raises:
269
InvalidOperation: if no active transaction
270
"""
271
272
@property
273
def in_transaction(self):
274
"""
275
Check if session has active transaction.
276
277
Returns:
278
bool: True if transaction is active
279
"""
280
281
@property
282
def has_ended(self):
283
"""
284
Check if session has ended.
285
286
Returns:
287
bool: True if session ended
288
"""
289
290
def end_session(self):
291
"""End the session."""
292
293
def with_transaction(self, callback, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
294
"""
295
Execute callback in a transaction with automatic retry.
296
297
Parameters:
298
- callback: function to execute in transaction
299
- read_concern: transaction read concern
300
- write_concern: transaction write concern
301
- read_preference: transaction read preference
302
- max_commit_time_ms: maximum commit time
303
304
Returns:
305
Any: Result from callback function
306
"""
307
```
308
309
### Transaction Options
310
311
Configuration for transaction behavior.
312
313
```python { .api }
314
class TransactionOptions:
315
def __init__(
316
self,
317
read_concern=None,
318
write_concern=None,
319
read_preference=None,
320
max_commit_time_ms=None
321
):
322
"""
323
Transaction configuration options.
324
325
Parameters:
326
- read_concern: read concern for transaction
327
- write_concern: write concern for transaction
328
- read_preference: read preference for transaction
329
- max_commit_time_ms: maximum time for commit
330
"""
331
332
@property
333
def read_concern(self):
334
"""Transaction read concern."""
335
336
@property
337
def write_concern(self):
338
"""Transaction write concern."""
339
340
@property
341
def read_preference(self):
342
"""Transaction read preference."""
343
344
@property
345
def max_commit_time_ms(self):
346
"""Maximum commit time in milliseconds."""
347
```
348
349
## Usage Examples
350
351
### Bulk Write Operations
352
353
```python
354
from pymongo import MongoClient
355
from pymongo.operations import InsertOne, UpdateOne, DeleteOne, ReplaceOne
356
357
client = MongoClient()
358
collection = client.mydb.mycollection
359
360
# Prepare bulk operations
361
requests = [
362
InsertOne({"name": "Alice", "age": 25}),
363
InsertOne({"name": "Bob", "age": 30}),
364
UpdateOne({"name": "Charlie"}, {"$set": {"age": 35}}),
365
ReplaceOne(
366
{"name": "David"},
367
{"name": "David", "age": 40, "status": "active"},
368
upsert=True
369
),
370
DeleteOne({"name": "Eve"})
371
]
372
373
# Execute bulk write
374
try:
375
result = collection.bulk_write(requests)
376
print(f"Inserted: {result.inserted_count}")
377
print(f"Modified: {result.modified_count}")
378
print(f"Deleted: {result.deleted_count}")
379
print(f"Upserted: {result.upserted_count}")
380
if result.upserted_ids:
381
print(f"Upserted IDs: {result.upserted_ids}")
382
except BulkWriteError as e:
383
print(f"Bulk write error: {e.details}")
384
385
# Unordered bulk write (continues on errors)
386
result = collection.bulk_write(requests, ordered=False)
387
```
388
389
### Transaction Usage
390
391
```python
392
from pymongo import MongoClient
393
from pymongo.errors import PyMongoError
394
395
client = MongoClient()
396
db = client.mydb
397
398
# Basic transaction
399
with client.start_session() as session:
400
with session.start_transaction():
401
db.accounts.update_one(
402
{"name": "Alice"},
403
{"$inc": {"balance": -100}},
404
session=session
405
)
406
db.accounts.update_one(
407
{"name": "Bob"},
408
{"$inc": {"balance": 100}},
409
session=session
410
)
411
# Transaction commits automatically on context exit
412
413
# Manual transaction control
414
session = client.start_session()
415
try:
416
session.start_transaction()
417
418
# Perform operations
419
db.inventory.update_one(
420
{"item": "widget", "qty": {"$gte": 5}},
421
{"$inc": {"qty": -5}},
422
session=session
423
)
424
db.orders.insert_one(
425
{"item": "widget", "qty": 5, "status": "pending"},
426
session=session
427
)
428
429
# Commit transaction
430
session.commit_transaction()
431
print("Transaction committed")
432
433
except PyMongoError as e:
434
session.abort_transaction()
435
print(f"Transaction aborted: {e}")
436
finally:
437
session.end_session()
438
```
439
440
### Transaction with Callback
441
442
```python
443
def transfer_funds(session):
444
"""Transfer funds between accounts."""
445
# This function will be called within a transaction
446
accounts = session.client.mydb.accounts
447
448
# Check source account balance
449
source = accounts.find_one({"name": "Alice"}, session=session)
450
if source["balance"] < 100:
451
raise ValueError("Insufficient funds")
452
453
# Perform transfer
454
accounts.update_one(
455
{"name": "Alice"},
456
{"$inc": {"balance": -100}},
457
session=session
458
)
459
accounts.update_one(
460
{"name": "Bob"},
461
{"$inc": {"balance": 100}},
462
session=session
463
)
464
465
return "Transfer completed"
466
467
# Execute with automatic retry on transient errors
468
with client.start_session() as session:
469
try:
470
result = session.with_transaction(transfer_funds)
471
print(result)
472
except ValueError as e:
473
print(f"Business logic error: {e}")
474
```
475
476
### Advanced Bulk Operations
477
478
```python
479
from pymongo.operations import UpdateMany, InsertOne
480
from pymongo.write_concern import WriteConcern
481
482
# Bulk operations with write concern
483
requests = [
484
InsertOne({"category": "electronics", "price": 299.99}),
485
UpdateMany(
486
{"category": "electronics"},
487
{"$mul": {"price": 0.9}}, # 10% discount
488
hint="category_1" # Use specific index
489
),
490
UpdateMany(
491
{"status": "pending", "created": {"$lt": "2023-01-01"}},
492
{"$set": {"status": "expired"}},
493
collation={"locale": "en", "strength": 2}
494
)
495
]
496
497
# Execute with custom write concern
498
result = collection.bulk_write(
499
requests,
500
ordered=True,
501
bypass_document_validation=False
502
)
503
504
print(f"Bulk operation result: {result.bulk_api_result}")
505
```
506
507
### Session with Read Preferences
508
509
```python
510
from pymongo import ReadPreference
511
from pymongo.read_concern import ReadConcern
512
513
# Start session with specific options
514
with client.start_session(
515
causal_consistency=True,
516
default_transaction_options=TransactionOptions(
517
read_concern=ReadConcern("snapshot"),
518
write_concern=WriteConcern(w="majority"),
519
read_preference=ReadPreference.PRIMARY
520
)
521
) as session:
522
523
# Read operations will use session's read preference
524
docs = list(collection.find({}, session=session))
525
526
# Transaction inherits default options
527
with session.start_transaction():
528
collection.insert_one({"timestamp": datetime.now()}, session=session)
529
collection.update_many(
530
{"processed": False},
531
{"$set": {"processed": True}},
532
session=session
533
)
534
```