Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage
—
Bulk write operations, transaction support, and session management for high-performance and ACID-compliant operations.
Perform multiple write operations efficiently in a single request.
class Collection:
def bulk_write(self, requests, ordered=True, bypass_document_validation=False, session=None):
"""
Execute bulk write operations.
Parameters:
- requests: list of write operation instances
- ordered: stop on first error if True
- bypass_document_validation: skip document validation
- session: optional ClientSession
Returns:
BulkWriteResult: Result summary
"""
def initialize_ordered_bulk_op(self):
"""
Initialize ordered bulk operation (deprecated).
Returns:
BulkOperationBuilder: Bulk operation builder
"""
def initialize_unordered_bulk_op(self):
"""
Initialize unordered bulk operation (deprecated).
Returns:
BulkOperationBuilder: Bulk operation builder
"""Individual operation types for bulk writes.
class InsertOne:
def __init__(self, document):
"""
Single document insert operation.
Parameters:
- document: document to insert
"""
class UpdateOne:
def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
"""
Single document update operation.
Parameters:
- filter: query criteria
- update: update specification
- upsert: insert if no match found
- collation: collation options
- array_filters: array update filters
- hint: index hint
"""
class UpdateMany:
def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
"""
Multiple document update operation.
Parameters:
- filter: query criteria
- update: update specification
- upsert: insert if no match found
- collation: collation options
- array_filters: array update filters
- hint: index hint
"""
class ReplaceOne:
def __init__(self, filter, replacement, upsert=False, collation=None, hint=None):
"""
Single document replacement operation.
Parameters:
- filter: query criteria
- replacement: replacement document
- upsert: insert if no match found
- collation: collation options
- hint: index hint
"""
class DeleteOne:
def __init__(self, filter, collation=None, hint=None):
"""
Single document delete operation.
Parameters:
- filter: query criteria
- collation: collation options
- hint: index hint
"""
class DeleteMany:
def __init__(self, filter, collation=None, hint=None):
"""
Multiple document delete operation.
Parameters:
- filter: query criteria
- collation: collation options
- hint: index hint
"""
class IndexModel:
def __init__(self, keys, **kwargs):
"""
Index creation specification.
Parameters:
- keys: index key specification
- name: index name
- unique: unique constraint
- background: background build (deprecated)
- sparse: sparse index
- expireAfterSeconds: TTL seconds
- partialFilterExpression: partial index filter
- collation: collation options
"""Result objects for bulk operations.
class BulkWriteResult:
@property
def acknowledged(self):
"""
Write acknowledgment status.
Returns:
bool: True if acknowledged
"""
@property
def inserted_count(self):
"""
Number of inserted documents.
Returns:
int: Insert count
"""
@property
def matched_count(self):
"""
Number of matched documents for updates.
Returns:
int: Match count
"""
@property
def modified_count(self):
"""
Number of modified documents.
Returns:
int: Modification count
"""
@property
def deleted_count(self):
"""
Number of deleted documents.
Returns:
int: Delete count
"""
@property
def upserted_count(self):
"""
Number of upserted documents.
Returns:
int: Upsert count
"""
@property
def upserted_ids(self):
"""
Mapping of operation index to upserted _id.
Returns:
dict: {index: _id} mapping
"""
class BulkWriteError(OperationFailure):
@property
def details(self):
"""
Detailed error information.
Returns:
dict: Error details with writeErrors and writeConcernErrors
"""Multi-document ACID transactions with session management.
class MongoClient:
def start_session(
self,
causal_consistency=None,
default_transaction_options=None,
snapshot=False
):
"""
Start a client session.
Parameters:
- causal_consistency: enable causal consistency
- default_transaction_options: default TransactionOptions
- snapshot: enable snapshot reads
Returns:
ClientSession: Session for operations
"""
class ClientSession:
def start_transaction(self, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
"""
Start a multi-document transaction.
Parameters:
- read_concern: transaction read concern
- write_concern: transaction write concern
- read_preference: transaction read preference
- max_commit_time_ms: maximum commit time
Raises:
InvalidOperation: if transaction already started
"""
def commit_transaction(self):
"""
Commit the current transaction.
Raises:
InvalidOperation: if no active transaction
"""
def abort_transaction(self):
"""
Abort the current transaction.
Raises:
InvalidOperation: if no active transaction
"""
@property
def in_transaction(self):
"""
Check if session has active transaction.
Returns:
bool: True if transaction is active
"""
@property
def has_ended(self):
"""
Check if session has ended.
Returns:
bool: True if session ended
"""
def end_session(self):
"""End the session."""
def with_transaction(self, callback, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
"""
Execute callback in a transaction with automatic retry.
Parameters:
- callback: function to execute in transaction
- read_concern: transaction read concern
- write_concern: transaction write concern
- read_preference: transaction read preference
- max_commit_time_ms: maximum commit time
Returns:
Any: Result from callback function
"""Configuration for transaction behavior.
class TransactionOptions:
def __init__(
self,
read_concern=None,
write_concern=None,
read_preference=None,
max_commit_time_ms=None
):
"""
Transaction configuration options.
Parameters:
- read_concern: read concern for transaction
- write_concern: write concern for transaction
- read_preference: read preference for transaction
- max_commit_time_ms: maximum time for commit
"""
@property
def read_concern(self):
"""Transaction read concern."""
@property
def write_concern(self):
"""Transaction write concern."""
@property
def read_preference(self):
"""Transaction read preference."""
@property
def max_commit_time_ms(self):
"""Maximum commit time in milliseconds."""from pymongo import MongoClient
from pymongo.operations import InsertOne, UpdateOne, DeleteOne, ReplaceOne
client = MongoClient()
collection = client.mydb.mycollection
# Prepare bulk operations
requests = [
InsertOne({"name": "Alice", "age": 25}),
InsertOne({"name": "Bob", "age": 30}),
UpdateOne({"name": "Charlie"}, {"$set": {"age": 35}}),
ReplaceOne(
{"name": "David"},
{"name": "David", "age": 40, "status": "active"},
upsert=True
),
DeleteOne({"name": "Eve"})
]
# Execute bulk write
try:
result = collection.bulk_write(requests)
print(f"Inserted: {result.inserted_count}")
print(f"Modified: {result.modified_count}")
print(f"Deleted: {result.deleted_count}")
print(f"Upserted: {result.upserted_count}")
if result.upserted_ids:
print(f"Upserted IDs: {result.upserted_ids}")
except BulkWriteError as e:
print(f"Bulk write error: {e.details}")
# Unordered bulk write (continues on errors)
result = collection.bulk_write(requests, ordered=False)from pymongo import MongoClient
from pymongo.errors import PyMongoError
client = MongoClient()
db = client.mydb
# Basic transaction
with client.start_session() as session:
with session.start_transaction():
db.accounts.update_one(
{"name": "Alice"},
{"$inc": {"balance": -100}},
session=session
)
db.accounts.update_one(
{"name": "Bob"},
{"$inc": {"balance": 100}},
session=session
)
# Transaction commits automatically on context exit
# Manual transaction control
session = client.start_session()
try:
session.start_transaction()
# Perform operations
db.inventory.update_one(
{"item": "widget", "qty": {"$gte": 5}},
{"$inc": {"qty": -5}},
session=session
)
db.orders.insert_one(
{"item": "widget", "qty": 5, "status": "pending"},
session=session
)
# Commit transaction
session.commit_transaction()
print("Transaction committed")
except PyMongoError as e:
session.abort_transaction()
print(f"Transaction aborted: {e}")
finally:
session.end_session()def transfer_funds(session):
"""Transfer funds between accounts."""
# This function will be called within a transaction
accounts = session.client.mydb.accounts
# Check source account balance
source = accounts.find_one({"name": "Alice"}, session=session)
if source["balance"] < 100:
raise ValueError("Insufficient funds")
# Perform transfer
accounts.update_one(
{"name": "Alice"},
{"$inc": {"balance": -100}},
session=session
)
accounts.update_one(
{"name": "Bob"},
{"$inc": {"balance": 100}},
session=session
)
return "Transfer completed"
# Execute with automatic retry on transient errors
with client.start_session() as session:
try:
result = session.with_transaction(transfer_funds)
print(result)
except ValueError as e:
print(f"Business logic error: {e}")from pymongo.operations import UpdateMany, InsertOne
from pymongo.write_concern import WriteConcern
# Bulk operations with write concern
requests = [
InsertOne({"category": "electronics", "price": 299.99}),
UpdateMany(
{"category": "electronics"},
{"$mul": {"price": 0.9}}, # 10% discount
hint="category_1" # Use specific index
),
UpdateMany(
{"status": "pending", "created": {"$lt": "2023-01-01"}},
{"$set": {"status": "expired"}},
collation={"locale": "en", "strength": 2}
)
]
# Execute with custom write concern
result = collection.bulk_write(
requests,
ordered=True,
bypass_document_validation=False
)
print(f"Bulk operation result: {result.bulk_api_result}")from pymongo import ReadPreference
from pymongo.read_concern import ReadConcern
# Start session with specific options
with client.start_session(
causal_consistency=True,
default_transaction_options=TransactionOptions(
read_concern=ReadConcern("snapshot"),
write_concern=WriteConcern(w="majority"),
read_preference=ReadPreference.PRIMARY
)
) as session:
# Read operations will use session's read preference
docs = list(collection.find({}, session=session))
# Transaction inherits default options
with session.start_transaction():
collection.insert_one({"timestamp": datetime.now()}, session=session)
collection.update_many(
{"processed": False},
{"$set": {"processed": True}},
session=session
)Install with Tessl CLI
npx tessl i tessl/pypi-pymongo