Non-blocking MongoDB driver for Python asyncio and Tornado applications
—
GridFS support for storing and retrieving large files in MongoDB. Motor provides comprehensive GridFS functionality with streaming operations, metadata management, and both asyncio and Tornado framework support.
The primary interface for GridFS operations, providing file upload, download, and management functionality.
# AsyncIO GridFS Bucket
class AsyncIOMotorGridFSBucket:
def __init__(
self,
database: AsyncIOMotorDatabase,
bucket_name: str = 'fs',
chunk_size_bytes: int = 261120,
write_concern=None,
read_concern=None
):
"""
Create a GridFS bucket.
Parameters:
- database: The database to use for GridFS
- bucket_name: The bucket name (collection prefix)
- chunk_size_bytes: Default chunk size for uploads
- write_concern: Write concern for GridFS operations
- read_concern: Read concern for GridFS operations
"""
# Upload Operations
async def upload_from_stream(
self,
filename: str,
source,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> Any:
"""Upload a file from a source stream."""
async def upload_from_stream_with_id(
self,
file_id: Any,
filename: str,
source,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> None:
"""Upload a file with a specific ID."""
def open_upload_stream(
self,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> AsyncIOMotorGridIn:
"""Open an upload stream for writing."""
def open_upload_stream_with_id(
self,
file_id: Any,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> AsyncIOMotorGridIn:
"""Open an upload stream with a specific ID."""
# Download Operations
async def download_to_stream(
self,
file_id: Any,
destination,
session=None
) -> None:
"""Download a file by ID to a destination stream."""
async def download_to_stream_by_name(
self,
filename: str,
destination,
revision: int = -1,
session=None
) -> None:
"""Download a file by name to a destination stream."""
async def open_download_stream(
self,
file_id: Any,
session=None
) -> AsyncIOMotorGridOut:
"""Open a download stream by file ID."""
async def open_download_stream_by_name(
self,
filename: str,
revision: int = -1,
session=None
) -> AsyncIOMotorGridOut:
"""Open a download stream by filename."""
# File Management
async def delete(self, file_id: Any, session=None) -> None:
"""Delete a file by ID."""
async def rename(
self,
file_id: Any,
new_filename: str,
session=None
) -> None:
"""Rename a file."""
def find(
self,
filter: Optional[Dict[str, Any]] = None,
batch_size: int = 0,
limit: int = 0,
no_cursor_timeout: bool = False,
skip: int = 0,
sort: Optional[List[Tuple[str, int]]] = None,
session=None
) -> AsyncIOMotorGridOutCursor:
"""Find files matching the filter."""
# Tornado GridFS Bucket
class MotorGridFSBucket:
def __init__(
self,
database: MotorDatabase,
bucket_name: str = 'fs',
chunk_size_bytes: int = 261120,
write_concern=None,
read_concern=None
):
"""Create a GridFS bucket for Tornado applications."""
# Upload Operations
def upload_from_stream(
self,
filename: str,
source,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> tornado.concurrent.Future:
"""Upload a file from a source stream."""
def upload_from_stream_with_id(
self,
file_id: Any,
filename: str,
source,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> tornado.concurrent.Future:
"""Upload a file with a specific ID."""
def open_upload_stream(
self,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> MotorGridIn:
"""Open an upload stream for writing."""
def open_upload_stream_with_id(
self,
file_id: Any,
filename: str,
chunk_size_bytes: Optional[int] = None,
metadata: Optional[Dict[str, Any]] = None,
session=None
) -> MotorGridIn:
"""Open an upload stream with a specific ID."""
# Download Operations
def download_to_stream(
self,
file_id: Any,
destination,
session=None
) -> tornado.concurrent.Future:
"""Download a file by ID to a destination stream."""
def download_to_stream_by_name(
self,
filename: str,
destination,
revision: int = -1,
session=None
) -> tornado.concurrent.Future:
"""Download a file by name to a destination stream."""
def open_download_stream(
self,
file_id: Any,
session=None
) -> tornado.concurrent.Future:
"""Open a download stream by file ID."""
def open_download_stream_by_name(
self,
filename: str,
revision: int = -1,
session=None
) -> tornado.concurrent.Future:
"""Open a download stream by filename."""
# File Management
def delete(self, file_id: Any, session=None) -> tornado.concurrent.Future:
"""Delete a file by ID."""
def rename(
self,
file_id: Any,
new_filename: str,
session=None
) -> tornado.concurrent.Future:
"""Rename a file."""
def find(
self,
filter: Optional[Dict[str, Any]] = None,
batch_size: int = 0,
limit: int = 0,
no_cursor_timeout: bool = False,
skip: int = 0,
sort: Optional[List[Tuple[str, int]]] = None,
session=None
) -> MotorGridOutCursor:
"""Find files matching the filter."""Stream interface for uploading files to GridFS with write operations and metadata management.
# AsyncIO Upload Stream
class AsyncIOMotorGridIn:
# Properties (read-only after upload starts)
@property
def _id(self) -> Any:
"""The file's unique identifier."""
@property
def filename(self) -> str:
"""The file's name."""
@property
def name(self) -> str:
"""Alias for filename."""
@property
def content_type(self) -> Optional[str]:
"""The file's content type."""
@property
def length(self) -> int:
"""The current length of the file."""
@property
def chunk_size(self) -> int:
"""The chunk size for this file."""
@property
def upload_date(self) -> Optional[datetime.datetime]:
"""The upload date (available after close)."""
@property
def metadata(self) -> Optional[Dict[str, Any]]:
"""The file's metadata."""
@property
def closed(self) -> bool:
"""Whether the file is closed."""
# Write Operations
async def write(self, data: bytes) -> None:
"""Write data to the file."""
async def writelines(self, lines: List[bytes]) -> None:
"""Write multiple lines to the file."""
# Stream Management
async def close(self) -> None:
"""Close and finalize the file upload."""
async def abort(self) -> None:
"""Abort the upload and delete any chunks."""
# Metadata Management
async def set(self, name: str, value: Any) -> None:
"""Set a metadata field."""
# Stream Properties
def writable(self) -> bool:
"""Whether the stream is writable."""
# Tornado Upload Stream
class MotorGridIn:
# Properties (identical to AsyncIO version)
@property
def _id(self) -> Any: ...
@property
def filename(self) -> str: ...
@property
def name(self) -> str: ...
@property
def content_type(self) -> Optional[str]: ...
@property
def length(self) -> int: ...
@property
def chunk_size(self) -> int: ...
@property
def upload_date(self) -> Optional[datetime.datetime]: ...
@property
def metadata(self) -> Optional[Dict[str, Any]]: ...
@property
def closed(self) -> bool: ...
# Write Operations (return Tornado Futures)
def write(self, data: bytes) -> tornado.concurrent.Future: ...
def writelines(self, lines: List[bytes]) -> tornado.concurrent.Future: ...
def close(self) -> tornado.concurrent.Future: ...
def abort(self) -> tornado.concurrent.Future: ...
def set(self, name: str, value: Any) -> tornado.concurrent.Future: ...
def writable(self) -> bool: ...Stream interface for downloading files from GridFS with read operations and file metadata access.
# AsyncIO Download Stream
class AsyncIOMotorGridOut:
# Properties (available after open())
@property
def _id(self) -> Any:
"""The file's unique identifier."""
@property
def filename(self) -> str:
"""The file's name."""
@property
def name(self) -> str:
"""Alias for filename."""
@property
def content_type(self) -> Optional[str]:
"""The file's content type."""
@property
def length(self) -> int:
"""The file's length in bytes."""
@property
def chunk_size(self) -> int:
"""The chunk size for this file."""
@property
def upload_date(self) -> datetime.datetime:
"""When the file was uploaded."""
@property
def metadata(self) -> Optional[Dict[str, Any]]:
"""The file's metadata."""
@property
def aliases(self) -> Optional[List[str]]:
"""The file's aliases (deprecated)."""
# Read Operations
async def open(self) -> AsyncIOMotorGridOut:
"""Open the file for reading (must be called first)."""
async def read(self, size: int = -1) -> bytes:
"""Read up to size bytes from the file."""
async def readchunk(self) -> bytes:
"""Read one chunk from the file."""
async def readline(self, size: int = -1) -> bytes:
"""Read one line from the file."""
# Stream Navigation
def seek(self, pos: int, whence: int = 0) -> int:
"""Seek to a position in the file."""
def tell(self) -> int:
"""Get the current position in the file."""
def close(self) -> None:
"""Close the file."""
# Stream Properties
def readable(self) -> bool:
"""Whether the stream is readable."""
def seekable(self) -> bool:
"""Whether the stream supports seeking."""
# Tornado Download Stream
class MotorGridOut:
# Properties (identical to AsyncIO version)
@property
def _id(self) -> Any: ...
@property
def filename(self) -> str: ...
@property
def name(self) -> str: ...
@property
def content_type(self) -> Optional[str]: ...
@property
def length(self) -> int: ...
@property
def chunk_size(self) -> int: ...
@property
def upload_date(self) -> datetime.datetime: ...
@property
def metadata(self) -> Optional[Dict[str, Any]]: ...
@property
def aliases(self) -> Optional[List[str]]: ...
# Read Operations (return Tornado Futures)
def open(self) -> tornado.concurrent.Future: ...
def read(self, size: int = -1) -> tornado.concurrent.Future: ...
def readchunk(self) -> tornado.concurrent.Future: ...
def readline(self, size: int = -1) -> tornado.concurrent.Future: ...
# Stream Navigation (synchronous)
def seek(self, pos: int, whence: int = 0) -> int: ...
def tell(self) -> int: ...
def close(self) -> None: ...
def readable(self) -> bool: ...
def seekable(self) -> bool: ...Cursor for iterating over GridFS file metadata and accessing file information.
# AsyncIO GridFS Cursor
class AsyncIOMotorGridOutCursor:
def limit(self, limit: int) -> AsyncIOMotorGridOutCursor:
"""Limit the number of results."""
def skip(self, skip: int) -> AsyncIOMotorGridOutCursor:
"""Skip a number of results."""
def sort(
self,
key_or_list: Union[str, List[Tuple[str, int]]],
direction: Optional[int] = None
) -> AsyncIOMotorGridOutCursor:
"""Sort the results."""
def batch_size(self, batch_size: int) -> AsyncIOMotorGridOutCursor:
"""Set the batch size."""
async def to_list(self, length: Optional[int] = None) -> List[AsyncIOMotorGridOut]:
"""Convert cursor to a list."""
def __aiter__(self) -> AsyncIOMotorGridOutCursor:
"""Async iterator protocol."""
async def __anext__(self) -> AsyncIOMotorGridOut:
"""Get next file."""
# Tornado GridFS Cursor
class MotorGridOutCursor:
def limit(self, limit: int) -> MotorGridOutCursor: ...
def skip(self, skip: int) -> MotorGridOutCursor: ...
def sort(
self,
key_or_list: Union[str, List[Tuple[str, int]]],
direction: Optional[int] = None
) -> MotorGridOutCursor: ...
def batch_size(self, batch_size: int) -> MotorGridOutCursor: ...
def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
def next_object(self) -> tornado.concurrent.Future: ...import asyncio
import motor.motor_asyncio
from io import BytesIO
async def gridfs_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client.test_database
# Create GridFS bucket
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
# Upload from bytes
file_data = b"Hello, GridFS world!"
source = BytesIO(file_data)
file_id = await bucket.upload_from_stream(
"hello.txt",
source,
metadata={"type": "greeting", "author": "Motor"}
)
print(f"Uploaded file with ID: {file_id}")
# Download to bytes
destination = BytesIO()
await bucket.download_to_stream(file_id, destination)
downloaded_data = destination.getvalue()
print(f"Downloaded: {downloaded_data.decode()}")
# Stream upload
upload_stream = bucket.open_upload_stream(
"large_file.dat",
metadata={"description": "Large file example"}
)
# Write data in chunks
for i in range(10):
chunk = f"Chunk {i}\n".encode()
await upload_stream.write(chunk)
await upload_stream.close()
print(f"Uploaded large file with ID: {upload_stream._id}")
# Stream download
download_stream = await bucket.open_download_stream(upload_stream._id)
# Read file information
print(f"File: {download_stream.filename}")
print(f"Size: {download_stream.length} bytes")
print(f"Content Type: {download_stream.content_type}")
print(f"Upload Date: {download_stream.upload_date}")
print(f"Metadata: {download_stream.metadata}")
# Read data
data = await download_stream.read()
print(f"Downloaded data: {data.decode()}")
download_stream.close()
client.close()
asyncio.run(gridfs_example())import asyncio
import motor.motor_asyncio
async def file_management_example():
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client.test_database
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
# Find files
cursor = bucket.find({"metadata.type": "image"})
async for file_doc in cursor:
print(f"Found file: {file_doc.filename} (ID: {file_doc._id})")
# Find files with sorting and limiting
cursor = bucket.find().sort("uploadDate", -1).limit(5)
recent_files = await cursor.to_list(5)
for file_doc in recent_files:
print(f"Recent file: {file_doc.filename}")
# Rename a file
if recent_files:
file_id = recent_files[0]._id
await bucket.rename(file_id, "renamed_file.txt")
print(f"Renamed file {file_id}")
# Delete a file
if len(recent_files) > 1:
file_id = recent_files[1]._id
await bucket.delete(file_id)
print(f"Deleted file {file_id}")
client.close()
asyncio.run(file_management_example())import asyncio
import motor.motor_asyncio
async def large_file_streaming():
client = motor.motor_asyncio.AsyncIOMotorClient()
db = client.test_database
bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
# Upload large file in chunks
upload_stream = bucket.open_upload_stream(
"video.mp4",
chunk_size_bytes=1024*1024, # 1MB chunks
metadata={"type": "video", "codec": "h264"}
)
# Simulate large file upload
total_size = 0
for i in range(100): # 100MB file simulation
chunk = b"X" * 1024 * 1024 # 1MB of data
await upload_stream.write(chunk)
total_size += len(chunk)
if i % 10 == 0:
print(f"Uploaded {total_size / (1024*1024):.1f}MB")
await upload_stream.close()
print(f"Upload complete. File ID: {upload_stream._id}")
# Stream download with progress
download_stream = await bucket.open_download_stream(upload_stream._id)
print(f"Downloading {download_stream.length / (1024*1024):.1f}MB file")
downloaded_size = 0
while downloaded_size < download_stream.length:
chunk = await download_stream.readchunk()
if not chunk:
break
downloaded_size += len(chunk)
progress = (downloaded_size / download_stream.length) * 100
if downloaded_size % (10 * 1024 * 1024) == 0: # Every 10MB
print(f"Downloaded {progress:.1f}%")
download_stream.close()
print("Download complete")
client.close()
asyncio.run(large_file_streaming())from typing import Any, Optional, Union, Dict, List, Tuple
from datetime import datetime
import io
# GridFS-specific types
GridFSFile = Dict[str, Any] # GridFS file document
ChunkData = bytes # File chunk data
FileId = Any # GridFS file identifier (usually ObjectId)Install with Tessl CLI
npx tessl i tessl/pypi-motor