CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

gridfs-operations.mddocs/

GridFS Operations

GridFS support for storing and retrieving large files in MongoDB. Motor provides comprehensive GridFS functionality with streaming operations, metadata management, and both asyncio and Tornado framework support.

Capabilities

GridFS Bucket

The primary interface for GridFS operations, providing file upload, download, and management functionality.

# AsyncIO GridFS Bucket
class AsyncIOMotorGridFSBucket:
    def __init__(
        self,
        database: AsyncIOMotorDatabase,
        bucket_name: str = 'fs',
        chunk_size_bytes: int = 261120,
        write_concern=None,
        read_concern=None
    ):
        """
        Create a GridFS bucket.
        
        Parameters:
        - database: The database to use for GridFS
        - bucket_name: The bucket name (collection prefix)
        - chunk_size_bytes: Default chunk size for uploads
        - write_concern: Write concern for GridFS operations
        - read_concern: Read concern for GridFS operations
        """
    
    # Upload Operations
    async def upload_from_stream(
        self,
        filename: str,
        source,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> Any:
        """Upload a file from a source stream."""
    
    async def upload_from_stream_with_id(
        self,
        file_id: Any,
        filename: str,
        source,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> None:
        """Upload a file with a specific ID."""
    
    def open_upload_stream(
        self,
        filename: str,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> AsyncIOMotorGridIn:
        """Open an upload stream for writing."""
    
    def open_upload_stream_with_id(
        self,
        file_id: Any,
        filename: str,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> AsyncIOMotorGridIn:
        """Open an upload stream with a specific ID."""
    
    # Download Operations
    async def download_to_stream(
        self,
        file_id: Any,
        destination,
        session=None
    ) -> None:
        """Download a file by ID to a destination stream."""
    
    async def download_to_stream_by_name(
        self,
        filename: str,
        destination,
        revision: int = -1,
        session=None
    ) -> None:
        """Download a file by name to a destination stream."""
    
    async def open_download_stream(
        self,
        file_id: Any,
        session=None
    ) -> AsyncIOMotorGridOut:
        """Open a download stream by file ID."""
    
    async def open_download_stream_by_name(
        self,
        filename: str,
        revision: int = -1,
        session=None
    ) -> AsyncIOMotorGridOut:
        """Open a download stream by filename."""
    
    # File Management
    async def delete(self, file_id: Any, session=None) -> None:
        """Delete a file by ID."""
    
    async def rename(
        self,
        file_id: Any,
        new_filename: str,
        session=None
    ) -> None:
        """Rename a file."""
    
    def find(
        self,
        filter: Optional[Dict[str, Any]] = None,
        batch_size: int = 0,
        limit: int = 0,
        no_cursor_timeout: bool = False,
        skip: int = 0,
        sort: Optional[List[Tuple[str, int]]] = None,
        session=None
    ) -> AsyncIOMotorGridOutCursor:
        """Find files matching the filter."""

# Tornado GridFS Bucket
class MotorGridFSBucket:
    def __init__(
        self,
        database: MotorDatabase,
        bucket_name: str = 'fs',
        chunk_size_bytes: int = 261120,
        write_concern=None,
        read_concern=None
    ):
        """Create a GridFS bucket for Tornado applications."""
    
    # Upload Operations  
    def upload_from_stream(
        self,
        filename: str,
        source,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Upload a file from a source stream."""
    
    def upload_from_stream_with_id(
        self,
        file_id: Any,
        filename: str,
        source,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> tornado.concurrent.Future:
        """Upload a file with a specific ID."""
    
    def open_upload_stream(
        self,
        filename: str,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> MotorGridIn:
        """Open an upload stream for writing."""
    
    def open_upload_stream_with_id(
        self,
        file_id: Any,
        filename: str,
        chunk_size_bytes: Optional[int] = None,
        metadata: Optional[Dict[str, Any]] = None,
        session=None
    ) -> MotorGridIn:
        """Open an upload stream with a specific ID."""
    
    # Download Operations
    def download_to_stream(
        self,
        file_id: Any,
        destination,
        session=None
    ) -> tornado.concurrent.Future:
        """Download a file by ID to a destination stream."""
    
    def download_to_stream_by_name(
        self,
        filename: str,
        destination,
        revision: int = -1,
        session=None
    ) -> tornado.concurrent.Future:
        """Download a file by name to a destination stream."""
    
    def open_download_stream(
        self,
        file_id: Any,
        session=None
    ) -> tornado.concurrent.Future:
        """Open a download stream by file ID."""
    
    def open_download_stream_by_name(
        self,
        filename: str,
        revision: int = -1,
        session=None
    ) -> tornado.concurrent.Future:
        """Open a download stream by filename."""
    
    # File Management
    def delete(self, file_id: Any, session=None) -> tornado.concurrent.Future:
        """Delete a file by ID."""
    
    def rename(
        self,
        file_id: Any,
        new_filename: str,
        session=None
    ) -> tornado.concurrent.Future:
        """Rename a file."""
    
    def find(
        self,
        filter: Optional[Dict[str, Any]] = None,
        batch_size: int = 0,
        limit: int = 0,
        no_cursor_timeout: bool = False,
        skip: int = 0,
        sort: Optional[List[Tuple[str, int]]] = None,
        session=None
    ) -> MotorGridOutCursor:
        """Find files matching the filter."""

GridFS Upload Stream

Stream interface for uploading files to GridFS with write operations and metadata management.

# AsyncIO Upload Stream
class AsyncIOMotorGridIn:
    # Properties (read-only after upload starts)
    @property
    def _id(self) -> Any:
        """The file's unique identifier."""
    
    @property
    def filename(self) -> str:
        """The file's name."""
    
    @property
    def name(self) -> str:
        """Alias for filename."""
    
    @property
    def content_type(self) -> Optional[str]:
        """The file's content type."""
    
    @property
    def length(self) -> int:
        """The current length of the file."""
    
    @property
    def chunk_size(self) -> int:
        """The chunk size for this file."""
    
    @property
    def upload_date(self) -> Optional[datetime.datetime]:
        """The upload date (available after close)."""
    
    @property
    def metadata(self) -> Optional[Dict[str, Any]]:
        """The file's metadata."""
    
    @property
    def closed(self) -> bool:
        """Whether the file is closed."""
    
    # Write Operations
    async def write(self, data: bytes) -> None:
        """Write data to the file."""
    
    async def writelines(self, lines: List[bytes]) -> None:
        """Write multiple lines to the file."""
    
    # Stream Management
    async def close(self) -> None:
        """Close and finalize the file upload."""
    
    async def abort(self) -> None:
        """Abort the upload and delete any chunks."""
    
    # Metadata Management
    async def set(self, name: str, value: Any) -> None:
        """Set a metadata field."""
    
    # Stream Properties
    def writable(self) -> bool:
        """Whether the stream is writable."""

# Tornado Upload Stream
class MotorGridIn:
    # Properties (identical to AsyncIO version)
    @property
    def _id(self) -> Any: ...
    @property
    def filename(self) -> str: ...
    @property
    def name(self) -> str: ...
    @property
    def content_type(self) -> Optional[str]: ...
    @property
    def length(self) -> int: ...
    @property
    def chunk_size(self) -> int: ...
    @property
    def upload_date(self) -> Optional[datetime.datetime]: ...
    @property
    def metadata(self) -> Optional[Dict[str, Any]]: ...
    @property
    def closed(self) -> bool: ...
    
    # Write Operations (return Tornado Futures)
    def write(self, data: bytes) -> tornado.concurrent.Future: ...
    def writelines(self, lines: List[bytes]) -> tornado.concurrent.Future: ...
    def close(self) -> tornado.concurrent.Future: ...
    def abort(self) -> tornado.concurrent.Future: ...
    def set(self, name: str, value: Any) -> tornado.concurrent.Future: ...
    def writable(self) -> bool: ...

GridFS Download Stream

Stream interface for downloading files from GridFS with read operations and file metadata access.

# AsyncIO Download Stream
class AsyncIOMotorGridOut:
    # Properties (available after open())
    @property
    def _id(self) -> Any:
        """The file's unique identifier."""
    
    @property
    def filename(self) -> str:
        """The file's name."""
    
    @property
    def name(self) -> str:
        """Alias for filename."""
    
    @property
    def content_type(self) -> Optional[str]:
        """The file's content type."""
    
    @property
    def length(self) -> int:
        """The file's length in bytes."""
    
    @property
    def chunk_size(self) -> int:
        """The chunk size for this file."""
    
    @property
    def upload_date(self) -> datetime.datetime:
        """When the file was uploaded."""
    
    @property
    def metadata(self) -> Optional[Dict[str, Any]]:
        """The file's metadata."""
    
    @property
    def aliases(self) -> Optional[List[str]]:
        """The file's aliases (deprecated)."""
    
    # Read Operations
    async def open(self) -> AsyncIOMotorGridOut:
        """Open the file for reading (must be called first)."""
    
    async def read(self, size: int = -1) -> bytes:
        """Read up to size bytes from the file."""
    
    async def readchunk(self) -> bytes:
        """Read one chunk from the file."""
    
    async def readline(self, size: int = -1) -> bytes:
        """Read one line from the file."""
    
    # Stream Navigation
    def seek(self, pos: int, whence: int = 0) -> int:
        """Seek to a position in the file."""
    
    def tell(self) -> int:
        """Get the current position in the file."""
    
    def close(self) -> None:
        """Close the file."""
    
    # Stream Properties
    def readable(self) -> bool:
        """Whether the stream is readable."""
    
    def seekable(self) -> bool:
        """Whether the stream supports seeking."""

# Tornado Download Stream
class MotorGridOut:
    # Properties (identical to AsyncIO version)
    @property
    def _id(self) -> Any: ...
    @property
    def filename(self) -> str: ...
    @property
    def name(self) -> str: ...
    @property
    def content_type(self) -> Optional[str]: ...
    @property
    def length(self) -> int: ...
    @property
    def chunk_size(self) -> int: ...
    @property
    def upload_date(self) -> datetime.datetime: ...
    @property
    def metadata(self) -> Optional[Dict[str, Any]]: ...
    @property
    def aliases(self) -> Optional[List[str]]: ...
    
    # Read Operations (return Tornado Futures)
    def open(self) -> tornado.concurrent.Future: ...
    def read(self, size: int = -1) -> tornado.concurrent.Future: ...
    def readchunk(self) -> tornado.concurrent.Future: ...
    def readline(self, size: int = -1) -> tornado.concurrent.Future: ...
    
    # Stream Navigation (synchronous)
    def seek(self, pos: int, whence: int = 0) -> int: ...
    def tell(self) -> int: ...
    def close(self) -> None: ...
    def readable(self) -> bool: ...
    def seekable(self) -> bool: ...

GridFS Cursor

Cursor for iterating over GridFS file metadata and accessing file information.

# AsyncIO GridFS Cursor
class AsyncIOMotorGridOutCursor:
    def limit(self, limit: int) -> AsyncIOMotorGridOutCursor:
        """Limit the number of results."""
    
    def skip(self, skip: int) -> AsyncIOMotorGridOutCursor:
        """Skip a number of results."""
    
    def sort(
        self,
        key_or_list: Union[str, List[Tuple[str, int]]],
        direction: Optional[int] = None
    ) -> AsyncIOMotorGridOutCursor:
        """Sort the results."""
    
    def batch_size(self, batch_size: int) -> AsyncIOMotorGridOutCursor:
        """Set the batch size."""
    
    async def to_list(self, length: Optional[int] = None) -> List[AsyncIOMotorGridOut]:
        """Convert cursor to a list."""
    
    def __aiter__(self) -> AsyncIOMotorGridOutCursor:
        """Async iterator protocol."""
    
    async def __anext__(self) -> AsyncIOMotorGridOut:
        """Get next file."""

# Tornado GridFS Cursor  
class MotorGridOutCursor:
    def limit(self, limit: int) -> MotorGridOutCursor: ...
    def skip(self, skip: int) -> MotorGridOutCursor: ...
    def sort(
        self,
        key_or_list: Union[str, List[Tuple[str, int]]],
        direction: Optional[int] = None
    ) -> MotorGridOutCursor: ...
    def batch_size(self, batch_size: int) -> MotorGridOutCursor: ...
    
    def to_list(self, length: Optional[int] = None) -> tornado.concurrent.Future: ...
    def next_object(self) -> tornado.concurrent.Future: ...

Usage Examples

AsyncIO File Upload and Download

import asyncio
import motor.motor_asyncio
from io import BytesIO

async def gridfs_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    
    # Create GridFS bucket
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
    
    # Upload from bytes
    file_data = b"Hello, GridFS world!"
    source = BytesIO(file_data)
    
    file_id = await bucket.upload_from_stream(
        "hello.txt",
        source,
        metadata={"type": "greeting", "author": "Motor"}
    )
    print(f"Uploaded file with ID: {file_id}")
    
    # Download to bytes
    destination = BytesIO()
    await bucket.download_to_stream(file_id, destination)
    
    downloaded_data = destination.getvalue()
    print(f"Downloaded: {downloaded_data.decode()}")
    
    # Stream upload
    upload_stream = bucket.open_upload_stream(
        "large_file.dat",
        metadata={"description": "Large file example"}
    )
    
    # Write data in chunks
    for i in range(10):
        chunk = f"Chunk {i}\n".encode()
        await upload_stream.write(chunk)
    
    await upload_stream.close()
    print(f"Uploaded large file with ID: {upload_stream._id}")
    
    # Stream download
    download_stream = await bucket.open_download_stream(upload_stream._id)
    
    # Read file information
    print(f"File: {download_stream.filename}")
    print(f"Size: {download_stream.length} bytes")
    print(f"Content Type: {download_stream.content_type}")
    print(f"Upload Date: {download_stream.upload_date}")
    print(f"Metadata: {download_stream.metadata}")
    
    # Read data
    data = await download_stream.read()
    print(f"Downloaded data: {data.decode()}")
    
    download_stream.close()
    client.close()

asyncio.run(gridfs_example())

File Management Operations

import asyncio
import motor.motor_asyncio

async def file_management_example():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
    
    # Find files
    cursor = bucket.find({"metadata.type": "image"})
    async for file_doc in cursor:
        print(f"Found file: {file_doc.filename} (ID: {file_doc._id})")
    
    # Find files with sorting and limiting
    cursor = bucket.find().sort("uploadDate", -1).limit(5)
    recent_files = await cursor.to_list(5)
    
    for file_doc in recent_files:
        print(f"Recent file: {file_doc.filename}")
    
    # Rename a file
    if recent_files:
        file_id = recent_files[0]._id
        await bucket.rename(file_id, "renamed_file.txt")
        print(f"Renamed file {file_id}")
    
    # Delete a file
    if len(recent_files) > 1:
        file_id = recent_files[1]._id
        await bucket.delete(file_id)
        print(f"Deleted file {file_id}")
    
    client.close()

asyncio.run(file_management_example())

Large File Streaming

import asyncio
import motor.motor_asyncio

async def large_file_streaming():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    db = client.test_database
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(db)
    
    # Upload large file in chunks
    upload_stream = bucket.open_upload_stream(
        "video.mp4",
        chunk_size_bytes=1024*1024,  # 1MB chunks
        metadata={"type": "video", "codec": "h264"}
    )
    
    # Simulate large file upload
    total_size = 0
    for i in range(100):  # 100MB file simulation
        chunk = b"X" * 1024 * 1024  # 1MB of data
        await upload_stream.write(chunk)
        total_size += len(chunk)
        
        if i % 10 == 0:
            print(f"Uploaded {total_size / (1024*1024):.1f}MB")
    
    await upload_stream.close()
    print(f"Upload complete. File ID: {upload_stream._id}")
    
    # Stream download with progress
    download_stream = await bucket.open_download_stream(upload_stream._id)
    print(f"Downloading {download_stream.length / (1024*1024):.1f}MB file")
    
    downloaded_size = 0
    while downloaded_size < download_stream.length:
        chunk = await download_stream.readchunk()
        if not chunk:
            break
        
        downloaded_size += len(chunk)
        progress = (downloaded_size / download_stream.length) * 100
        
        if downloaded_size % (10 * 1024 * 1024) == 0:  # Every 10MB
            print(f"Downloaded {progress:.1f}%")
    
    download_stream.close()
    print("Download complete")
    client.close()

asyncio.run(large_file_streaming())

Types

from typing import Any, Optional, Union, Dict, List, Tuple
from datetime import datetime
import io

# GridFS-specific types
GridFSFile = Dict[str, Any]  # GridFS file document
ChunkData = bytes  # File chunk data
FileId = Any  # GridFS file identifier (usually ObjectId)

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json