CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymongo

Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage

Pending
Overview
Eval results
Files

gridfs-storage.mddocs/

GridFS File Storage

GridFS support for storing and retrieving large files, including streaming operations and metadata management.

Capabilities

GridFS Interface

Legacy GridFS interface for file storage operations.

class GridFS:
    def __init__(self, database, collection="fs", disable_md5=False):
        """
        GridFS instance for file operations.

        Parameters:
        - database: Database instance
        - collection: GridFS collection prefix (default "fs")
        - disable_md5: disable MD5 checksum calculation
        """

    def new_file(self, **kwargs):
        """
        Create new GridFS file for writing.

        Parameters:
        - _id: file identifier
        - filename: file name
        - contentType: MIME content type
        - chunkSize: chunk size in bytes
        - metadata: custom metadata dictionary

        Returns:
        GridIn: File handle for writing
        """

    def put(self, data, **kwargs):
        """
        Store data as GridFS file.

        Parameters:
        - data: file data (bytes or file-like object)
        - kwargs: same as new_file()

        Returns:
        ObjectId: File identifier
        """

    def get(self, file_id, session=None):
        """
        Retrieve file by ID.

        Parameters:
        - file_id: file identifier
        - session: optional ClientSession

        Returns:
        GridOut: File handle for reading

        Raises:
        NoFile: if file not found
        """

    def get_version(self, filename=None, version=-1, session=None, **kwargs):
        """
        Retrieve file by filename and version.

        Parameters:
        - filename: file name
        - version: version number (-1 for latest)
        - session: optional ClientSession

        Returns:
        GridOut: File handle for reading

        Raises:
        NoFile: if file not found
        """

    def get_last_version(self, filename=None, session=None, **kwargs):
        """
        Retrieve latest version of file by filename.

        Parameters:
        - filename: file name
        - session: optional ClientSession

        Returns:
        GridOut: File handle for reading

        Raises:
        NoFile: if file not found
        """

    def delete(self, file_id, session=None):
        """
        Delete file by ID.

        Parameters:
        - file_id: file identifier
        - session: optional ClientSession

        Raises:
        NoFile: if file not found
        """

    def list(self, session=None):
        """
        List stored filenames.

        Parameters:
        - session: optional ClientSession

        Returns:
        list: List of filenames
        """

    def find_one(self, filter=None, session=None, *args, **kwargs):
        """
        Find single file by filter.

        Parameters:
        - filter: query criteria
        - session: optional ClientSession

        Returns:
        GridOut: File handle or None
        """

    def find(self, *args, **kwargs):
        """
        Find files matching criteria.

        Parameters:
        - filter: query criteria
        - skip: number of files to skip
        - limit: maximum number of files
        - sort: sort specification
        - session: optional ClientSession

        Returns:
        GridOutCursor: Cursor for files
        """

    def exists(self, document_or_id=None, session=None, **kwargs):
        """
        Check if file exists.

        Parameters:
        - document_or_id: file ID or query document
        - session: optional ClientSession

        Returns:
        bool: True if file exists
        """

GridFSBucket Interface

Modern GridFS interface with streaming support (recommended).

class GridFSBucket:
    def __init__(
        self,
        db,
        bucket_name="fs",
        chunk_size_bytes=DEFAULT_CHUNK_SIZE,
        write_concern=None,
        read_preference=None,
        disable_md5=False
    ):
        """
        GridFS bucket for file operations.

        Parameters:
        - db: Database instance
        - bucket_name: bucket name (default "fs")
        - chunk_size_bytes: default chunk size
        - write_concern: write concern for operations
        - read_preference: read preference for operations
        - disable_md5: disable MD5 checksum calculation
        """

    def open_upload_stream(
        self,
        filename,
        chunk_size_bytes=None,
        metadata=None,
        session=None
    ):
        """
        Open upload stream for writing file.

        Parameters:
        - filename: file name
        - chunk_size_bytes: chunk size override
        - metadata: custom metadata dictionary
        - session: optional ClientSession

        Returns:
        GridIn: Upload stream
        """

    def open_upload_stream_with_id(
        self,
        file_id,
        filename,
        chunk_size_bytes=None,
        metadata=None,
        session=None
    ):
        """
        Open upload stream with specific file ID.

        Parameters:
        - file_id: file identifier
        - filename: file name
        - chunk_size_bytes: chunk size override
        - metadata: custom metadata dictionary
        - session: optional ClientSession

        Returns:
        GridIn: Upload stream
        """

    def upload_from_stream(
        self,
        filename,
        source,
        chunk_size_bytes=None,
        metadata=None,
        session=None
    ):
        """
        Upload file from stream.

        Parameters:
        - filename: file name
        - source: readable file-like object
        - chunk_size_bytes: chunk size override
        - metadata: custom metadata dictionary
        - session: optional ClientSession

        Returns:
        ObjectId: File identifier
        """

    def upload_from_stream_with_id(
        self,
        file_id,
        filename,
        source,
        chunk_size_bytes=None,
        metadata=None,
        session=None
    ):
        """
        Upload file from stream with specific ID.

        Parameters:
        - file_id: file identifier
        - filename: file name
        - source: readable file-like object
        - chunk_size_bytes: chunk size override
        - metadata: custom metadata dictionary
        - session: optional ClientSession
        """

    def open_download_stream(self, file_id, session=None):
        """
        Open download stream by file ID.

        Parameters:
        - file_id: file identifier
        - session: optional ClientSession

        Returns:
        GridOut: Download stream

        Raises:
        NoFile: if file not found
        """

    def download_to_stream(self, file_id, destination, session=None):
        """
        Download file to stream by ID.

        Parameters:
        - file_id: file identifier
        - destination: writable file-like object
        - session: optional ClientSession

        Raises:
        NoFile: if file not found
        """

    def delete(self, file_id, session=None):
        """
        Delete file by ID.

        Parameters:
        - file_id: file identifier
        - session: optional ClientSession

        Raises:
        NoFile: if file not found
        """

    def find(self, filter=None, session=None, **kwargs):
        """
        Find files matching criteria.

        Parameters:
        - filter: query criteria for files collection
        - batch_size: cursor batch size
        - limit: maximum number of files
        - skip: number of files to skip
        - sort: sort specification
        - session: optional ClientSession

        Returns:
        GridOutCursor: Cursor for files
        """

    def open_download_stream_by_name(
        self,
        filename,
        revision=-1,
        session=None
    ):
        """
        Open download stream by filename.

        Parameters:
        - filename: file name
        - revision: file revision (-1 for latest)
        - session: optional ClientSession

        Returns:
        GridOut: Download stream

        Raises:
        NoFile: if file not found
        """

    def download_to_stream_by_name(
        self,
        filename,
        destination,
        revision=-1,
        session=None
    ):
        """
        Download file to stream by name.

        Parameters:
        - filename: file name
        - destination: writable file-like object
        - revision: file revision (-1 for latest)
        - session: optional ClientSession

        Raises:
        NoFile: if file not found
        """

    def rename(self, file_id, new_filename, session=None):
        """
        Rename file.

        Parameters:
        - file_id: file identifier
        - new_filename: new file name
        - session: optional ClientSession

        Raises:
        NoFile: if file not found
        """

GridFS File Objects

File objects for reading and writing GridFS files.

class GridIn:
    def __init__(self, root_collection, session=None, disable_md5=False, **kwargs):
        """
        GridFS file for writing.

        Parameters:
        - root_collection: GridFS root collection
        - session: optional ClientSession
        - disable_md5: disable MD5 calculation
        - kwargs: file metadata
        """

    def write(self, data):
        """
        Write data to file.

        Parameters:
        - data: bytes to write
        """

    def writelines(self, lines):
        """
        Write sequence of bytes.

        Parameters:
        - lines: sequence of bytes
        """

    def close(self):
        """Close file and finalize upload."""

    def abort(self):
        """Abort upload and delete partial file."""

    @property
    def closed(self):
        """
        Check if file is closed.

        Returns:
        bool: True if closed
        """

    @property
    def _id(self):
        """
        File identifier.

        Returns:
        ObjectId: File ID
        """

    @property
    def filename(self):
        """
        File name.

        Returns:
        str: File name
        """

    @property
    def length(self):
        """
        File size in bytes.

        Returns:
        int: File size
        """

    @property
    def chunk_size(self):
        """
        Chunk size in bytes.

        Returns:
        int: Chunk size
        """

    @property
    def upload_date(self):
        """
        Upload completion timestamp.

        Returns:
        datetime: Upload date
        """

    @property
    def md5(self):
        """
        MD5 checksum (if enabled).

        Returns:
        str: MD5 hash or None
        """

    @property
    def metadata(self):
        """
        Custom metadata.

        Returns:
        dict: Metadata dictionary
        """

class GridOut:
    def __init__(self, root_collection, file_id=None, file_document=None, session=None):
        """
        GridFS file for reading.

        Parameters:
        - root_collection: GridFS root collection
        - file_id: file identifier
        - file_document: file document
        - session: optional ClientSession
        """

    def read(self, size=-1):
        """
        Read data from file.

        Parameters:
        - size: bytes to read (-1 for all)

        Returns:
        bytes: File data
        """

    def readline(self, size=-1):
        """
        Read line from file.

        Parameters:
        - size: maximum bytes to read

        Returns:
        bytes: Line data
        """

    def readlines(self):
        """
        Read all lines from file.

        Returns:
        list: List of lines as bytes
        """

    def seek(self, pos, whence=0):
        """
        Seek to file position.

        Parameters:
        - pos: position
        - whence: seek mode (0=absolute, 1=relative, 2=from end)
        """

    def tell(self):
        """
        Get current file position.

        Returns:
        int: Current position
        """

    def close(self):
        """Close file."""

    def __iter__(self):
        """Iterate over file lines."""

    def __enter__(self):
        """Context manager entry."""

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Context manager exit."""

    # Same properties as GridIn
    @property
    def _id(self): ...
    @property
    def filename(self): ...
    @property
    def length(self): ...
    @property
    def chunk_size(self): ...
    @property
    def upload_date(self): ...
    @property
    def md5(self): ...
    @property
    def metadata(self): ...

class GridOutCursor:
    def __init__(self, collection, filter=None, session=None, **kwargs):
        """
        Cursor for GridFS files.

        Parameters:
        - collection: files collection
        - filter: query criteria
        - session: optional ClientSession
        - kwargs: cursor options
        """

    def __iter__(self):
        """Iterate over files."""

    def __next__(self):
        """Get next file."""

    def next(self):
        """Get next file (Python 2 compatibility)."""

    def clone(self):
        """Clone cursor."""

    def count(self):
        """
        Count matching files.

        Returns:
        int: File count
        """

Constants and Exceptions

GridFS-related constants and error handling.

DEFAULT_CHUNK_SIZE: int  # Default chunk size (255KB)

class NoFile(Exception):
    """Raised when GridFS file is not found."""

Usage Examples

Basic GridFS Operations

from pymongo import MongoClient
import gridfs
from io import BytesIO

client = MongoClient()
db = client.mydb
fs = gridfs.GridFS(db)

# Store a file
with open("image.jpg", "rb") as f:
    file_id = fs.put(f, filename="profile.jpg", contentType="image/jpeg")
print(f"Stored file with ID: {file_id}")

# Retrieve a file
grid_out = fs.get(file_id)
with open("downloaded.jpg", "wb") as f:
    f.write(grid_out.read())

print(f"Downloaded {grid_out.filename}, size: {grid_out.length} bytes")

# Store with metadata
file_id = fs.put(
    b"Hello, GridFS!",
    filename="greeting.txt",
    contentType="text/plain",
    metadata={"author": "Alice", "tags": ["greeting", "sample"]}
)

# Find and list files
for grid_file in fs.find({"metadata.author": "Alice"}):
    print(f"File: {grid_file.filename}, Author: {grid_file.metadata['author']}")

# Delete a file
fs.delete(file_id)

GridFSBucket Operations (Recommended)

from pymongo import MongoClient
import gridfs
from io import BytesIO

client = MongoClient()
db = client.mydb
bucket = gridfs.GridFSBucket(db, bucket_name="images")

# Upload from stream
with open("photo.jpg", "rb") as f:
    file_id = bucket.upload_from_stream(
        "user_photo.jpg",
        f,
        metadata={"user_id": 12345, "category": "profile"}
    )

print(f"Uploaded photo with ID: {file_id}")

# Download to stream
with open("downloaded_photo.jpg", "wb") as f:
    bucket.download_to_stream(file_id, f)

# Upload with custom chunk size for large files
with open("video.mp4", "rb") as f:
    file_id = bucket.upload_from_stream(
        "presentation.mp4",
        f,
        chunk_size_bytes=1024*1024,  # 1MB chunks
        metadata={"duration": 1800, "resolution": "1080p"}
    )

# Stream processing
upload_stream = bucket.open_upload_stream(
    "processed_data.csv",
    metadata={"processing_date": "2023-06-01"}
)

# Write data in chunks
for chunk in process_large_dataset():
    upload_stream.write(chunk.encode())

upload_stream.close()
print(f"Processed file ID: {upload_stream._id}")

Advanced GridFS Usage

import gridfs
from bson import ObjectId
from datetime import datetime

# Custom GridFS collection
fs = gridfs.GridFS(db, collection="documents")

# Store with specific file ID
custom_id = ObjectId()
fs.put(
    b"Important document content",
    _id=custom_id,
    filename="contract.pdf",
    contentType="application/pdf",
    metadata={
        "department": "legal",
        "confidential": True,
        "expires": datetime(2025, 12, 31)
    }
)

# Find files with complex queries
large_images = fs.find({
    "contentType": {"$regex": "^image/"},
    "length": {"$gt": 1024*1024},  # > 1MB
    "uploadDate": {"$gte": datetime(2023, 1, 1)}
}).sort("uploadDate", -1)

for img in large_images:
    print(f"Large image: {img.filename}, {img.length/1024/1024:.1f}MB")

# Version management by filename
versions = list(fs.find({"filename": "document.txt"}).sort("uploadDate", 1))
print(f"Found {len(versions)} versions of document.txt")

# Get latest version
latest = fs.get_last_version("document.txt")
print(f"Latest version uploaded: {latest.upload_date}")

# Stream reading
grid_out = fs.get(file_id)
while True:
    chunk = grid_out.read(8192)  # Read 8KB chunks
    if not chunk:
        break
    process_chunk(chunk)
grid_out.close()

GridFS with Transactions

import gridfs
from pymongo.errors import PyMongoError

client = MongoClient()
db = client.mydb
bucket = gridfs.GridFSBucket(db)

# GridFS operations in transaction
with client.start_session() as session:
    with session.start_transaction():
        try:
            # Upload file
            with open("data.json", "rb") as f:
                file_id = bucket.upload_from_stream(
                    "backup.json",
                    f,
                    session=session
                )
            
            # Update metadata in related collection
            db.backups.insert_one({
                "file_id": file_id,
                "created_date": datetime.now(),
                "status": "completed"
            }, session=session)
            
            print("Backup created successfully")
            
        except PyMongoError as e:
            print(f"Backup failed: {e}")
            raise  # Will abort transaction

# Cleanup old backups
def cleanup_old_backups(session):
    """Remove backups older than 30 days."""
    cutoff_date = datetime.now() - timedelta(days=30)
    
    old_backups = db.backups.find(
        {"created_date": {"$lt": cutoff_date}},
        session=session
    )
    
    for backup in old_backups:
        # Delete GridFS file
        bucket.delete(backup["file_id"], session=session)
        # Delete metadata
        db.backups.delete_one({"_id": backup["_id"]}, session=session)

# Run cleanup in transaction
with client.start_session() as session:
    session.with_transaction(cleanup_old_backups)

Install with Tessl CLI

npx tessl i tessl/pypi-pymongo

docs

advanced-queries.md

bson-handling.md

bulk-transactions.md

client-connection.md

database-collection.md

gridfs-storage.md

index.md

monitoring-events.md

tile.json