Official Python driver for MongoDB providing comprehensive tools for database operations, BSON handling, and GridFS file storage
—
GridFS support for storing and retrieving large files, including streaming operations and metadata management.
Legacy GridFS interface for file storage operations.
class GridFS:
def __init__(self, database, collection="fs", disable_md5=False):
"""
GridFS instance for file operations.
Parameters:
- database: Database instance
- collection: GridFS collection prefix (default "fs")
- disable_md5: disable MD5 checksum calculation
"""
def new_file(self, **kwargs):
"""
Create new GridFS file for writing.
Parameters:
- _id: file identifier
- filename: file name
- contentType: MIME content type
- chunkSize: chunk size in bytes
- metadata: custom metadata dictionary
Returns:
GridIn: File handle for writing
"""
def put(self, data, **kwargs):
"""
Store data as GridFS file.
Parameters:
- data: file data (bytes or file-like object)
- kwargs: same as new_file()
Returns:
ObjectId: File identifier
"""
def get(self, file_id, session=None):
"""
Retrieve file by ID.
Parameters:
- file_id: file identifier
- session: optional ClientSession
Returns:
GridOut: File handle for reading
Raises:
NoFile: if file not found
"""
def get_version(self, filename=None, version=-1, session=None, **kwargs):
"""
Retrieve file by filename and version.
Parameters:
- filename: file name
- version: version number (-1 for latest)
- session: optional ClientSession
Returns:
GridOut: File handle for reading
Raises:
NoFile: if file not found
"""
def get_last_version(self, filename=None, session=None, **kwargs):
"""
Retrieve latest version of file by filename.
Parameters:
- filename: file name
- session: optional ClientSession
Returns:
GridOut: File handle for reading
Raises:
NoFile: if file not found
"""
def delete(self, file_id, session=None):
"""
Delete file by ID.
Parameters:
- file_id: file identifier
- session: optional ClientSession
Raises:
NoFile: if file not found
"""
def list(self, session=None):
"""
List stored filenames.
Parameters:
- session: optional ClientSession
Returns:
list: List of filenames
"""
def find_one(self, filter=None, session=None, *args, **kwargs):
"""
Find single file by filter.
Parameters:
- filter: query criteria
- session: optional ClientSession
Returns:
GridOut: File handle or None
"""
def find(self, *args, **kwargs):
"""
Find files matching criteria.
Parameters:
- filter: query criteria
- skip: number of files to skip
- limit: maximum number of files
- sort: sort specification
- session: optional ClientSession
Returns:
GridOutCursor: Cursor for files
"""
def exists(self, document_or_id=None, session=None, **kwargs):
"""
Check if file exists.
Parameters:
- document_or_id: file ID or query document
- session: optional ClientSession
Returns:
bool: True if file exists
"""Modern GridFS interface with streaming support (recommended).
class GridFSBucket:
def __init__(
self,
db,
bucket_name="fs",
chunk_size_bytes=DEFAULT_CHUNK_SIZE,
write_concern=None,
read_preference=None,
disable_md5=False
):
"""
GridFS bucket for file operations.
Parameters:
- db: Database instance
- bucket_name: bucket name (default "fs")
- chunk_size_bytes: default chunk size
- write_concern: write concern for operations
- read_preference: read preference for operations
- disable_md5: disable MD5 checksum calculation
"""
def open_upload_stream(
self,
filename,
chunk_size_bytes=None,
metadata=None,
session=None
):
"""
Open upload stream for writing file.
Parameters:
- filename: file name
- chunk_size_bytes: chunk size override
- metadata: custom metadata dictionary
- session: optional ClientSession
Returns:
GridIn: Upload stream
"""
def open_upload_stream_with_id(
self,
file_id,
filename,
chunk_size_bytes=None,
metadata=None,
session=None
):
"""
Open upload stream with specific file ID.
Parameters:
- file_id: file identifier
- filename: file name
- chunk_size_bytes: chunk size override
- metadata: custom metadata dictionary
- session: optional ClientSession
Returns:
GridIn: Upload stream
"""
def upload_from_stream(
self,
filename,
source,
chunk_size_bytes=None,
metadata=None,
session=None
):
"""
Upload file from stream.
Parameters:
- filename: file name
- source: readable file-like object
- chunk_size_bytes: chunk size override
- metadata: custom metadata dictionary
- session: optional ClientSession
Returns:
ObjectId: File identifier
"""
def upload_from_stream_with_id(
self,
file_id,
filename,
source,
chunk_size_bytes=None,
metadata=None,
session=None
):
"""
Upload file from stream with specific ID.
Parameters:
- file_id: file identifier
- filename: file name
- source: readable file-like object
- chunk_size_bytes: chunk size override
- metadata: custom metadata dictionary
- session: optional ClientSession
"""
def open_download_stream(self, file_id, session=None):
"""
Open download stream by file ID.
Parameters:
- file_id: file identifier
- session: optional ClientSession
Returns:
GridOut: Download stream
Raises:
NoFile: if file not found
"""
def download_to_stream(self, file_id, destination, session=None):
"""
Download file to stream by ID.
Parameters:
- file_id: file identifier
- destination: writable file-like object
- session: optional ClientSession
Raises:
NoFile: if file not found
"""
def delete(self, file_id, session=None):
"""
Delete file by ID.
Parameters:
- file_id: file identifier
- session: optional ClientSession
Raises:
NoFile: if file not found
"""
def find(self, filter=None, session=None, **kwargs):
"""
Find files matching criteria.
Parameters:
- filter: query criteria for files collection
- batch_size: cursor batch size
- limit: maximum number of files
- skip: number of files to skip
- sort: sort specification
- session: optional ClientSession
Returns:
GridOutCursor: Cursor for files
"""
def open_download_stream_by_name(
self,
filename,
revision=-1,
session=None
):
"""
Open download stream by filename.
Parameters:
- filename: file name
- revision: file revision (-1 for latest)
- session: optional ClientSession
Returns:
GridOut: Download stream
Raises:
NoFile: if file not found
"""
def download_to_stream_by_name(
self,
filename,
destination,
revision=-1,
session=None
):
"""
Download file to stream by name.
Parameters:
- filename: file name
- destination: writable file-like object
- revision: file revision (-1 for latest)
- session: optional ClientSession
Raises:
NoFile: if file not found
"""
def rename(self, file_id, new_filename, session=None):
"""
Rename file.
Parameters:
- file_id: file identifier
- new_filename: new file name
- session: optional ClientSession
Raises:
NoFile: if file not found
"""File objects for reading and writing GridFS files.
class GridIn:
def __init__(self, root_collection, session=None, disable_md5=False, **kwargs):
"""
GridFS file for writing.
Parameters:
- root_collection: GridFS root collection
- session: optional ClientSession
- disable_md5: disable MD5 calculation
- kwargs: file metadata
"""
def write(self, data):
"""
Write data to file.
Parameters:
- data: bytes to write
"""
def writelines(self, lines):
"""
Write sequence of bytes.
Parameters:
- lines: sequence of bytes
"""
def close(self):
"""Close file and finalize upload."""
def abort(self):
"""Abort upload and delete partial file."""
@property
def closed(self):
"""
Check if file is closed.
Returns:
bool: True if closed
"""
@property
def _id(self):
"""
File identifier.
Returns:
ObjectId: File ID
"""
@property
def filename(self):
"""
File name.
Returns:
str: File name
"""
@property
def length(self):
"""
File size in bytes.
Returns:
int: File size
"""
@property
def chunk_size(self):
"""
Chunk size in bytes.
Returns:
int: Chunk size
"""
@property
def upload_date(self):
"""
Upload completion timestamp.
Returns:
datetime: Upload date
"""
@property
def md5(self):
"""
MD5 checksum (if enabled).
Returns:
str: MD5 hash or None
"""
@property
def metadata(self):
"""
Custom metadata.
Returns:
dict: Metadata dictionary
"""
class GridOut:
def __init__(self, root_collection, file_id=None, file_document=None, session=None):
"""
GridFS file for reading.
Parameters:
- root_collection: GridFS root collection
- file_id: file identifier
- file_document: file document
- session: optional ClientSession
"""
def read(self, size=-1):
"""
Read data from file.
Parameters:
- size: bytes to read (-1 for all)
Returns:
bytes: File data
"""
def readline(self, size=-1):
"""
Read line from file.
Parameters:
- size: maximum bytes to read
Returns:
bytes: Line data
"""
def readlines(self):
"""
Read all lines from file.
Returns:
list: List of lines as bytes
"""
def seek(self, pos, whence=0):
"""
Seek to file position.
Parameters:
- pos: position
- whence: seek mode (0=absolute, 1=relative, 2=from end)
"""
def tell(self):
"""
Get current file position.
Returns:
int: Current position
"""
def close(self):
"""Close file."""
def __iter__(self):
"""Iterate over file lines."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
# Same properties as GridIn
@property
def _id(self): ...
@property
def filename(self): ...
@property
def length(self): ...
@property
def chunk_size(self): ...
@property
def upload_date(self): ...
@property
def md5(self): ...
@property
def metadata(self): ...
class GridOutCursor:
def __init__(self, collection, filter=None, session=None, **kwargs):
"""
Cursor for GridFS files.
Parameters:
- collection: files collection
- filter: query criteria
- session: optional ClientSession
- kwargs: cursor options
"""
def __iter__(self):
"""Iterate over files."""
def __next__(self):
"""Get next file."""
def next(self):
"""Get next file (Python 2 compatibility)."""
def clone(self):
"""Clone cursor."""
def count(self):
"""
Count matching files.
Returns:
int: File count
"""GridFS-related constants and error handling.
DEFAULT_CHUNK_SIZE: int # Default chunk size (255KB)
class NoFile(Exception):
"""Raised when GridFS file is not found."""from pymongo import MongoClient
import gridfs
from io import BytesIO
client = MongoClient()
db = client.mydb
fs = gridfs.GridFS(db)
# Store a file
with open("image.jpg", "rb") as f:
file_id = fs.put(f, filename="profile.jpg", contentType="image/jpeg")
print(f"Stored file with ID: {file_id}")
# Retrieve a file
grid_out = fs.get(file_id)
with open("downloaded.jpg", "wb") as f:
f.write(grid_out.read())
print(f"Downloaded {grid_out.filename}, size: {grid_out.length} bytes")
# Store with metadata
file_id = fs.put(
b"Hello, GridFS!",
filename="greeting.txt",
contentType="text/plain",
metadata={"author": "Alice", "tags": ["greeting", "sample"]}
)
# Find and list files
for grid_file in fs.find({"metadata.author": "Alice"}):
print(f"File: {grid_file.filename}, Author: {grid_file.metadata['author']}")
# Delete a file
fs.delete(file_id)from pymongo import MongoClient
import gridfs
from io import BytesIO
client = MongoClient()
db = client.mydb
bucket = gridfs.GridFSBucket(db, bucket_name="images")
# Upload from stream
with open("photo.jpg", "rb") as f:
file_id = bucket.upload_from_stream(
"user_photo.jpg",
f,
metadata={"user_id": 12345, "category": "profile"}
)
print(f"Uploaded photo with ID: {file_id}")
# Download to stream
with open("downloaded_photo.jpg", "wb") as f:
bucket.download_to_stream(file_id, f)
# Upload with custom chunk size for large files
with open("video.mp4", "rb") as f:
file_id = bucket.upload_from_stream(
"presentation.mp4",
f,
chunk_size_bytes=1024*1024, # 1MB chunks
metadata={"duration": 1800, "resolution": "1080p"}
)
# Stream processing
upload_stream = bucket.open_upload_stream(
"processed_data.csv",
metadata={"processing_date": "2023-06-01"}
)
# Write data in chunks
for chunk in process_large_dataset():
upload_stream.write(chunk.encode())
upload_stream.close()
print(f"Processed file ID: {upload_stream._id}")import gridfs
from bson import ObjectId
from datetime import datetime
# Custom GridFS collection
fs = gridfs.GridFS(db, collection="documents")
# Store with specific file ID
custom_id = ObjectId()
fs.put(
b"Important document content",
_id=custom_id,
filename="contract.pdf",
contentType="application/pdf",
metadata={
"department": "legal",
"confidential": True,
"expires": datetime(2025, 12, 31)
}
)
# Find files with complex queries
large_images = fs.find({
"contentType": {"$regex": "^image/"},
"length": {"$gt": 1024*1024}, # > 1MB
"uploadDate": {"$gte": datetime(2023, 1, 1)}
}).sort("uploadDate", -1)
for img in large_images:
print(f"Large image: {img.filename}, {img.length/1024/1024:.1f}MB")
# Version management by filename
versions = list(fs.find({"filename": "document.txt"}).sort("uploadDate", 1))
print(f"Found {len(versions)} versions of document.txt")
# Get latest version
latest = fs.get_last_version("document.txt")
print(f"Latest version uploaded: {latest.upload_date}")
# Stream reading
grid_out = fs.get(file_id)
while True:
chunk = grid_out.read(8192) # Read 8KB chunks
if not chunk:
break
process_chunk(chunk)
grid_out.close()import gridfs
from pymongo.errors import PyMongoError
client = MongoClient()
db = client.mydb
bucket = gridfs.GridFSBucket(db)
# GridFS operations in transaction
with client.start_session() as session:
with session.start_transaction():
try:
# Upload file
with open("data.json", "rb") as f:
file_id = bucket.upload_from_stream(
"backup.json",
f,
session=session
)
# Update metadata in related collection
db.backups.insert_one({
"file_id": file_id,
"created_date": datetime.now(),
"status": "completed"
}, session=session)
print("Backup created successfully")
except PyMongoError as e:
print(f"Backup failed: {e}")
raise # Will abort transaction
# Cleanup old backups
def cleanup_old_backups(session):
"""Remove backups older than 30 days."""
cutoff_date = datetime.now() - timedelta(days=30)
old_backups = db.backups.find(
{"created_date": {"$lt": cutoff_date}},
session=session
)
for backup in old_backups:
# Delete GridFS file
bucket.delete(backup["file_id"], session=session)
# Delete metadata
db.backups.delete_one({"_id": backup["_id"]}, session=session)
# Run cleanup in transaction
with client.start_session() as session:
session.with_transaction(cleanup_old_backups)Install with Tessl CLI
npx tessl i tessl/pypi-pymongo