CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymongo

Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage

Pending
Overview
Eval results
Files

bulk-transactions.mddocs/

Bulk Operations and Transactions

Bulk write operations, transaction support, and session management for high-performance and ACID-compliant operations.

Capabilities

Bulk Write Operations

Perform multiple write operations efficiently in a single request.

class Collection:
    def bulk_write(self, requests, ordered=True, bypass_document_validation=False, session=None):
        """
        Execute bulk write operations.

        Parameters:
        - requests: list of write operation instances
        - ordered: stop on first error if True
        - bypass_document_validation: skip document validation
        - session: optional ClientSession

        Returns:
        BulkWriteResult: Result summary
        """

    def initialize_ordered_bulk_op(self):
        """
        Initialize ordered bulk operation (deprecated).

        Returns:
        BulkOperationBuilder: Bulk operation builder
        """

    def initialize_unordered_bulk_op(self):
        """
        Initialize unordered bulk operation (deprecated).

        Returns:
        BulkOperationBuilder: Bulk operation builder
        """

Bulk Operation Types

Individual operation types for bulk writes.

class InsertOne:
    def __init__(self, document):
        """
        Single document insert operation.

        Parameters:
        - document: document to insert
        """

class UpdateOne:
    def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
        """
        Single document update operation.

        Parameters:
        - filter: query criteria
        - update: update specification
        - upsert: insert if no match found
        - collation: collation options
        - array_filters: array update filters
        - hint: index hint
        """

class UpdateMany:
    def __init__(self, filter, update, upsert=False, collation=None, array_filters=None, hint=None):
        """
        Multiple document update operation.

        Parameters:
        - filter: query criteria
        - update: update specification
        - upsert: insert if no match found
        - collation: collation options
        - array_filters: array update filters
        - hint: index hint
        """

class ReplaceOne:
    def __init__(self, filter, replacement, upsert=False, collation=None, hint=None):
        """
        Single document replacement operation.

        Parameters:
        - filter: query criteria
        - replacement: replacement document
        - upsert: insert if no match found
        - collation: collation options
        - hint: index hint
        """

class DeleteOne:
    def __init__(self, filter, collation=None, hint=None):
        """
        Single document delete operation.

        Parameters:
        - filter: query criteria
        - collation: collation options
        - hint: index hint
        """

class DeleteMany:
    def __init__(self, filter, collation=None, hint=None):
        """
        Multiple document delete operation.

        Parameters:
        - filter: query criteria
        - collation: collation options
        - hint: index hint
        """

class IndexModel:
    def __init__(self, keys, **kwargs):
        """
        Index creation specification.

        Parameters:
        - keys: index key specification
        - name: index name
        - unique: unique constraint
        - background: background build (deprecated)
        - sparse: sparse index
        - expireAfterSeconds: TTL seconds
        - partialFilterExpression: partial index filter
        - collation: collation options
        """

Bulk Write Results

Result objects for bulk operations.

class BulkWriteResult:
    @property
    def acknowledged(self):
        """
        Write acknowledgment status.

        Returns:
        bool: True if acknowledged
        """

    @property
    def inserted_count(self):
        """
        Number of inserted documents.

        Returns:
        int: Insert count
        """

    @property
    def matched_count(self):
        """
        Number of matched documents for updates.

        Returns:
        int: Match count
        """

    @property
    def modified_count(self):
        """
        Number of modified documents.

        Returns:
        int: Modification count
        """

    @property
    def deleted_count(self):
        """
        Number of deleted documents.

        Returns:
        int: Delete count
        """

    @property
    def upserted_count(self):
        """
        Number of upserted documents.

        Returns:
        int: Upsert count
        """

    @property
    def upserted_ids(self):
        """
        Mapping of operation index to upserted _id.

        Returns:
        dict: {index: _id} mapping
        """

class BulkWriteError(OperationFailure):
    @property
    def details(self):
        """
        Detailed error information.

        Returns:
        dict: Error details with writeErrors and writeConcernErrors
        """

Transaction Support

Multi-document ACID transactions with session management.

class MongoClient:
    def start_session(
        self,
        causal_consistency=None,
        default_transaction_options=None,
        snapshot=False
    ):
        """
        Start a client session.

        Parameters:
        - causal_consistency: enable causal consistency
        - default_transaction_options: default TransactionOptions
        - snapshot: enable snapshot reads

        Returns:
        ClientSession: Session for operations
        """

class ClientSession:
    def start_transaction(self, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
        """
        Start a multi-document transaction.

        Parameters:
        - read_concern: transaction read concern
        - write_concern: transaction write concern
        - read_preference: transaction read preference
        - max_commit_time_ms: maximum commit time

        Raises:
        InvalidOperation: if transaction already started
        """

    def commit_transaction(self):
        """
        Commit the current transaction.

        Raises:
        InvalidOperation: if no active transaction
        """

    def abort_transaction(self):
        """
        Abort the current transaction.

        Raises:
        InvalidOperation: if no active transaction
        """

    @property
    def in_transaction(self):
        """
        Check if session has active transaction.

        Returns:
        bool: True if transaction is active
        """

    @property
    def has_ended(self):
        """
        Check if session has ended.

        Returns:
        bool: True if session ended
        """

    def end_session(self):
        """End the session."""

    def with_transaction(self, callback, read_concern=None, write_concern=None, read_preference=None, max_commit_time_ms=None):
        """
        Execute callback in a transaction with automatic retry.

        Parameters:
        - callback: function to execute in transaction
        - read_concern: transaction read concern
        - write_concern: transaction write concern
        - read_preference: transaction read preference
        - max_commit_time_ms: maximum commit time

        Returns:
        Any: Result from callback function
        """

Transaction Options

Configuration for transaction behavior.

class TransactionOptions:
    def __init__(
        self,
        read_concern=None,
        write_concern=None,
        read_preference=None,
        max_commit_time_ms=None
    ):
        """
        Transaction configuration options.

        Parameters:
        - read_concern: read concern for transaction
        - write_concern: write concern for transaction
        - read_preference: read preference for transaction
        - max_commit_time_ms: maximum time for commit
        """

    @property
    def read_concern(self):
        """Transaction read concern."""

    @property
    def write_concern(self):
        """Transaction write concern."""

    @property
    def read_preference(self):
        """Transaction read preference."""

    @property
    def max_commit_time_ms(self):
        """Maximum commit time in milliseconds."""

Usage Examples

Bulk Write Operations

from pymongo import MongoClient
from pymongo.operations import InsertOne, UpdateOne, DeleteOne, ReplaceOne

client = MongoClient()
collection = client.mydb.mycollection

# Prepare bulk operations
requests = [
    InsertOne({"name": "Alice", "age": 25}),
    InsertOne({"name": "Bob", "age": 30}),
    UpdateOne({"name": "Charlie"}, {"$set": {"age": 35}}),
    ReplaceOne(
        {"name": "David"},
        {"name": "David", "age": 40, "status": "active"},
        upsert=True
    ),
    DeleteOne({"name": "Eve"})
]

# Execute bulk write
try:
    result = collection.bulk_write(requests)
    print(f"Inserted: {result.inserted_count}")
    print(f"Modified: {result.modified_count}")
    print(f"Deleted: {result.deleted_count}")
    print(f"Upserted: {result.upserted_count}")
    if result.upserted_ids:
        print(f"Upserted IDs: {result.upserted_ids}")
except BulkWriteError as e:
    print(f"Bulk write error: {e.details}")

# Unordered bulk write (continues on errors)
result = collection.bulk_write(requests, ordered=False)

Transaction Usage

from pymongo import MongoClient
from pymongo.errors import PyMongoError

client = MongoClient()
db = client.mydb

# Basic transaction
with client.start_session() as session:
    with session.start_transaction():
        db.accounts.update_one(
            {"name": "Alice"},
            {"$inc": {"balance": -100}},
            session=session
        )
        db.accounts.update_one(
            {"name": "Bob"},
            {"$inc": {"balance": 100}},
            session=session
        )
        # Transaction commits automatically on context exit

# Manual transaction control
session = client.start_session()
try:
    session.start_transaction()
    
    # Perform operations
    db.inventory.update_one(
        {"item": "widget", "qty": {"$gte": 5}},
        {"$inc": {"qty": -5}},
        session=session
    )
    db.orders.insert_one(
        {"item": "widget", "qty": 5, "status": "pending"},
        session=session
    )
    
    # Commit transaction
    session.commit_transaction()
    print("Transaction committed")
    
except PyMongoError as e:
    session.abort_transaction()
    print(f"Transaction aborted: {e}")
finally:
    session.end_session()

Transaction with Callback

def transfer_funds(session):
    """Transfer funds between accounts."""
    # This function will be called within a transaction
    accounts = session.client.mydb.accounts
    
    # Check source account balance
    source = accounts.find_one({"name": "Alice"}, session=session)
    if source["balance"] < 100:
        raise ValueError("Insufficient funds")
    
    # Perform transfer
    accounts.update_one(
        {"name": "Alice"},
        {"$inc": {"balance": -100}},
        session=session
    )
    accounts.update_one(
        {"name": "Bob"},
        {"$inc": {"balance": 100}},
        session=session
    )
    
    return "Transfer completed"

# Execute with automatic retry on transient errors
with client.start_session() as session:
    try:
        result = session.with_transaction(transfer_funds)
        print(result)
    except ValueError as e:
        print(f"Business logic error: {e}")

Advanced Bulk Operations

from pymongo.operations import UpdateMany, InsertOne
from pymongo.write_concern import WriteConcern

# Bulk operations with write concern
requests = [
    InsertOne({"category": "electronics", "price": 299.99}),
    UpdateMany(
        {"category": "electronics"},
        {"$mul": {"price": 0.9}},  # 10% discount
        hint="category_1"  # Use specific index
    ),
    UpdateMany(
        {"status": "pending", "created": {"$lt": "2023-01-01"}},
        {"$set": {"status": "expired"}},
        collation={"locale": "en", "strength": 2}
    )
]

# Execute with custom write concern
result = collection.bulk_write(
    requests,
    ordered=True,
    bypass_document_validation=False
)

print(f"Bulk operation result: {result.bulk_api_result}")

Session with Read Preferences

from pymongo import ReadPreference
from pymongo.read_concern import ReadConcern

# Start session with specific options
with client.start_session(
    causal_consistency=True,
    default_transaction_options=TransactionOptions(
        read_concern=ReadConcern("snapshot"),
        write_concern=WriteConcern(w="majority"),
        read_preference=ReadPreference.PRIMARY
    )
) as session:
    
    # Read operations will use session's read preference
    docs = list(collection.find({}, session=session))
    
    # Transaction inherits default options
    with session.start_transaction():
        collection.insert_one({"timestamp": datetime.now()}, session=session)
        collection.update_many(
            {"processed": False},
            {"$set": {"processed": True}},
            session=session
        )

Install with Tessl CLI

npx tessl i tessl/pypi-pymongo

docs

advanced-queries.md

bson-handling.md

bulk-transactions.md

client-connection.md

database-collection.md

gridfs-storage.md

index.md

monitoring-events.md

tile.json