CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-motor

Non-blocking MongoDB driver for Python asyncio and Tornado applications

Pending
Overview
Eval results
Files

web-integration.mddocs/

Web Integration

HTTP handlers for serving GridFS files through web frameworks. Motor provides optimized handlers for both Tornado and aiohttp with support for HTTP caching, range requests, content negotiation, and efficient file streaming.

Capabilities

Tornado GridFS Handler

RequestHandler for serving GridFS files in Tornado web applications with full HTTP feature support.

class GridFSHandler(tornado.web.RequestHandler):
    """
    Tornado RequestHandler for serving files from GridFS.
    
    Supports:
    - HTTP caching with ETag and Last-Modified headers
    - Range requests for partial content
    - Content-Type detection based on file extension
    - Custom file retrieval logic
    - Configurable cache behavior
    """
    
    def initialize(
        self,
        database: MotorDatabase,
        root_collection: str = 'fs',
        get_gridfs_file: Optional[Callable] = None,
        get_cache_time: Optional[Callable] = None
    ) -> None:
        """
        Initialize the GridFS handler.
        
        Parameters:
        - database: MotorDatabase instance for GridFS access
        - root_collection: GridFS collection name prefix (default: 'fs')
        - get_gridfs_file: Custom function to retrieve GridFS files
        - get_cache_time: Custom function to determine cache duration
        """
    
    async def get(self, filename: str, include_body: bool = True) -> None:
        """
        Handle GET requests for GridFS files.
        
        Parameters:
        - filename: Name of the file to serve
        - include_body: Whether to include file content in response
        """
    
    async def head(self, filename: str) -> None:
        """Handle HEAD requests (metadata only)."""
    
    # Customization Methods (override in subclasses)
    async def get_gridfs_file(
        self,
        fs: MotorGridFSBucket,
        filename: str
    ) -> MotorGridOut:
        """
        Override to customize file retrieval logic.
        
        Default implementation opens file by name.
        Override to implement custom logic like file ID lookup,
        permission checking, or path transformation.
        
        Parameters:
        - fs: GridFS bucket instance
        - filename: Requested filename from URL
        
        Returns:
        MotorGridOut instance for the requested file
        """
    
    def get_cache_time(
        self,
        filename: str,
        modified: datetime.datetime,
        mime_type: str
    ) -> int:
        """
        Override to customize cache duration.
        
        Parameters:
        - filename: Name of the file
        - modified: File modification timestamp
        - mime_type: MIME type of the file
        
        Returns:
        Cache duration in seconds (0 for no cache)
        """

aiohttp GridFS Handler

Handler for serving GridFS files in aiohttp web applications with async/await support.

class AIOHTTPGridFS:
    """
    aiohttp handler for serving files from GridFS.
    
    Supports:
    - HTTP caching with ETag and Last-Modified headers
    - Streaming file responses
    - Content-Type detection
    - Custom file retrieval and cache logic
    - If-Modified-Since conditional requests
    """
    
    def __init__(
        self,
        database: AsyncIOMotorDatabase,
        root_collection: str = 'fs',
        get_gridfs_file: Optional[Callable] = None,
        get_cache_time: Optional[Callable] = None,
        set_extra_headers: Optional[Callable] = None
    ) -> None:
        """
        Initialize the aiohttp GridFS handler.
        
        Parameters:
        - database: AsyncIOMotorDatabase instance
        - root_collection: GridFS collection name prefix
        - get_gridfs_file: Custom file retrieval function
        - get_cache_time: Custom cache duration function
        - set_extra_headers: Custom header setting function
        """
    
    async def __call__(self, request: aiohttp.web.Request) -> aiohttp.web.Response:
        """
        Handle HTTP requests for GridFS files.
        
        Parameters:
        - request: aiohttp Request object
        
        Returns:
        aiohttp Response with file content or error
        """

# Customization Functions for aiohttp
async def get_gridfs_file(
    bucket: AsyncIOMotorGridFSBucket,
    filename: str,
    request: aiohttp.web.Request
) -> AsyncIOMotorGridOut:
    """
    Default file retrieval function for aiohttp handler.
    
    Override this function to customize file lookup logic:
    - Retrieve by file ID instead of name
    - Implement access control
    - Transform file paths or names
    - Add logging or metrics
    
    Parameters:
    - bucket: GridFS bucket instance
    - filename: Requested filename from URL path
    - request: aiohttp request object for context
    
    Returns:
    AsyncIOMotorGridOut for the requested file
    """

def get_cache_time(
    filename: str,
    modified: datetime.datetime,
    mime_type: Optional[str]
) -> int:
    """
    Default cache duration function for aiohttp handler.
    
    Override to implement custom cache policies:
    - Different cache times for different file types
    - No caching for sensitive files
    - Long-term caching for static assets
    
    Parameters:
    - filename: Name of the file
    - modified: File upload/modification timestamp
    - mime_type: MIME type of the file (may be None)
    
    Returns:
    Cache duration in seconds (0 for no aggressive caching)
    """

def set_extra_headers(
    response: aiohttp.web.Response,
    gridout: AsyncIOMotorGridOut
) -> None:
    """
    Default extra headers function for aiohttp handler.
    
    Override to add custom HTTP headers:
    - Security headers (CSP, CORS, etc.)
    - Content-Encoding headers
    - Custom application headers
    
    Parameters:
    - response: aiohttp Response object to modify
    - gridout: GridFS file being served
    """

Usage Examples

Basic Tornado GridFS Server

import tornado.web
import tornado.ioloop
import motor.motor_tornado
from motor.web import GridFSHandler

def make_app():
    # Connect to MongoDB
    client = motor.motor_tornado.MotorClient()
    database = client.my_app_db
    
    return tornado.web.Application([
        # Serve GridFS files at /files/<filename>
        (r"/files/(.*)", GridFSHandler, {
            "database": database,
            "root_collection": "fs"  # Use default GridFS collection
        }),
        # Regular request handlers
        (r"/", MainHandler),
    ])

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("""
        <h1>GridFS File Server</h1>
        <p>Files available at <a href="/files/">/files/&lt;filename&gt;</a></p>
        """)

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    print("Server running on http://localhost:8888")
    tornado.ioloop.IOLoop.current().start()

Custom Tornado GridFS Handler

import tornado.web
import tornado.ioloop
import motor.motor_tornado
from motor.web import GridFSHandler
import mimetypes
from bson import ObjectId

class CustomGridFSHandler(GridFSHandler):
    """Custom GridFS handler with file ID support and custom caching."""
    
    async def get_gridfs_file(self, fs, filename):
        """Support both filename and ObjectId lookup."""
        
        # Try to parse as ObjectId first
        try:
            file_id = ObjectId(filename)
            return await fs.open_download_stream(file_id)
        except:
            # Fall back to filename lookup
            return await fs.open_download_stream_by_name(filename)
    
    def get_cache_time(self, filename, modified, mime_type):
        """Custom cache policy based on file type."""
        
        if mime_type:
            # Long cache for images and static assets
            if mime_type.startswith('image/'):
                return 3600 * 24 * 7  # 1 week
            elif mime_type in ['text/css', 'application/javascript']:
                return 3600 * 24  # 1 day
            elif mime_type.startswith('video/'):
                return 3600 * 2  # 2 hours
        
        # Default: no aggressive caching
        return 0

class UploadHandler(tornado.web.RequestHandler):
    """Handler for uploading files to GridFS."""
    
    async def post(self):
        database = self.application.settings['database']
        fs = motor.motor_tornado.MotorGridFSBucket(database)
        
        # Get uploaded file
        if 'file' not in self.request.files:
            self.set_status(400)
            self.write({"error": "No file uploaded"})
            return
        
        file_info = self.request.files['file'][0]
        filename = file_info['filename']
        content_type = file_info.get('content_type', 'application/octet-stream')
        file_data = file_info['body']
        
        # Upload to GridFS
        from io import BytesIO
        file_stream = BytesIO(file_data)
        
        file_id = await fs.upload_from_stream(
            filename,
            file_stream,
            metadata={'content_type': content_type}
        )
        
        self.write({
            "message": "File uploaded successfully",
            "file_id": str(file_id),
            "filename": filename,
            "url": f"/files/{file_id}"
        })

def make_app():
    client = motor.motor_tornado.MotorClient()
    database = client.file_server_db
    
    return tornado.web.Application([
        (r"/files/(.*)", CustomGridFSHandler, {"database": database}),
        (r"/upload", UploadHandler),
        (r"/", MainHandler),
    ], database=database)

if __name__ == "__main__":
    app = make_app()
    app.listen(8888)
    tornado.ioloop.IOLoop.current().start()

Basic aiohttp GridFS Server

import aiohttp.web
import motor.motor_asyncio
from motor.aiohttp import AIOHTTPGridFS

async def create_app():
    # Connect to MongoDB
    client = motor.motor_asyncio.AsyncIOMotorClient()
    database = client.my_app_db
    
    # Create GridFS handler
    gridfs_handler = AIOHTTPGridFS(database)
    
    app = aiohttp.web.Application()
    
    # Add GridFS route - filename must be in path variable
    resource = app.router.add_resource("/files/{filename}")
    resource.add_route("GET", gridfs_handler)
    resource.add_route("HEAD", gridfs_handler)
    
    # Add index route
    async def index(request):
        return aiohttp.web.Response(
            text="""
            <h1>GridFS File Server</h1>
            <p>Files available at <a href="/files/">/files/&lt;filename&gt;</a></p>
            """,
            content_type='text/html'
        )
    
    app.router.add_get('/', index)
    
    return app

if __name__ == "__main__":
    app = create_app()
    aiohttp.web.run_app(app, host='localhost', port=8888)

Custom aiohttp GridFS Handler

import aiohttp.web
import motor.motor_asyncio
from motor.aiohttp import AIOHTTPGridFS, get_gridfs_file, get_cache_time, set_extra_headers
from bson import ObjectId
import json

# Custom file retrieval function
async def custom_get_gridfs_file(bucket, filename, request):
    """Support file ID lookup and access control."""
    
    # Check authentication (example)
    auth_header = request.headers.get('Authorization')
    if not auth_header:
        raise aiohttp.web.HTTPUnauthorized(text="Authentication required")
    
    # Try ObjectId lookup first
    try:
        file_id = ObjectId(filename)
        return await bucket.open_download_stream(file_id)
    except:
        # Fall back to filename
        return await bucket.open_download_stream_by_name(filename)

# Custom cache policy
def custom_get_cache_time(filename, modified, mime_type):
    """Aggressive caching for static assets."""
    
    if mime_type:
        if mime_type.startswith('image/'):
            return 3600 * 24 * 30  # 30 days for images
        elif mime_type.startswith('video/'):
            return 3600 * 24 * 7   # 7 days for videos
        elif mime_type in ['text/css', 'application/javascript']:
            return 3600 * 24       # 1 day for CSS/JS
    
    return 3600  # 1 hour default

# Custom headers
def custom_set_extra_headers(response, gridout):
    """Add security and CORS headers."""
    
    # CORS headers
    response.headers['Access-Control-Allow-Origin'] = '*'
    response.headers['Access-Control-Allow-Methods'] = 'GET, HEAD'
    
    # Security headers
    response.headers['X-Content-Type-Options'] = 'nosniff'
    response.headers['X-Frame-Options'] = 'DENY'
    
    # Custom metadata headers
    if gridout.metadata:
        if 'author' in gridout.metadata:
            response.headers['X-File-Author'] = str(gridout.metadata['author'])

async def upload_handler(request):
    """Handle file uploads to GridFS."""
    
    database = request.app['database']
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(database)
    
    # Handle multipart upload
    reader = await request.multipart()
    
    while True:
        field = await reader.next()
        if not field:
            break
        
        if field.name == 'file':
            filename = field.filename
            content_type = field.headers.get('Content-Type', 'application/octet-stream')
            
            # Create upload stream
            upload_stream = bucket.open_upload_stream(
                filename,
                metadata={
                    'content_type': content_type,
                    'uploaded_by': request.headers.get('X-User-ID', 'anonymous')
                }
            )
            
            # Stream file data
            while True:
                chunk = await field.read_chunk()
                if not chunk:
                    break
                await upload_stream.write(chunk)
            
            await upload_stream.close()
            
            return aiohttp.web.json_response({
                'message': 'File uploaded successfully',
                'file_id': str(upload_stream._id),
                'filename': filename,
                'url': f'/files/{upload_stream._id}'
            })
    
    return aiohttp.web.json_response(
        {'error': 'No file uploaded'},
        status=400
    )

async def file_list_handler(request):
    """List available files."""
    
    database = request.app['database']
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(database)
    
    files = []
    cursor = bucket.find().sort('uploadDate', -1).limit(50)
    
    async for file_doc in cursor:
        files.append({
            'id': str(file_doc._id),
            'filename': file_doc.filename,
            'length': file_doc.length,
            'uploadDate': file_doc.upload_date.isoformat(),
            'contentType': file_doc.content_type,
            'url': f'/files/{file_doc._id}'
        })
    
    return aiohttp.web.json_response({'files': files})

async def create_advanced_app():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    database = client.advanced_file_server
    
    # Create custom GridFS handler
    gridfs_handler = AIOHTTPGridFS(
        database,
        get_gridfs_file=custom_get_gridfs_file,
        get_cache_time=custom_get_cache_time,
        set_extra_headers=custom_set_extra_headers
    )
    
    app = aiohttp.web.Application()
    app['database'] = database
    
    # GridFS file serving
    resource = app.router.add_resource("/files/{filename}")
    resource.add_route("GET", gridfs_handler)
    resource.add_route("HEAD", gridfs_handler)
    
    # File management endpoints
    app.router.add_post('/upload', upload_handler)
    app.router.add_get('/api/files', file_list_handler)
    
    return app

if __name__ == "__main__":
    app = create_advanced_app()
    aiohttp.web.run_app(app, host='localhost', port=8888)

GridFS File Upload Utility

import asyncio
import motor.motor_asyncio
import aiofiles
import mimetypes
import os

async def upload_file_to_gridfs(database, local_path, gridfs_filename=None):
    """Utility function to upload local files to GridFS."""
    
    bucket = motor.motor_asyncio.AsyncIOMotorGridFSBucket(database)
    
    if not gridfs_filename:
        gridfs_filename = os.path.basename(local_path)
    
    # Detect content type
    content_type, _ = mimetypes.guess_type(local_path)
    if not content_type:
        content_type = 'application/octet-stream'
    
    # Get file stats
    stat = os.stat(local_path)
    
    # Upload file
    async with aiofiles.open(local_path, 'rb') as f:
        file_id = await bucket.upload_from_stream(
            gridfs_filename,
            f,
            metadata={
                'content_type': content_type,
                'original_path': local_path,
                'file_size': stat.st_size,
                'upload_tool': 'motor_utility'
            }
        )
    
    print(f"Uploaded {local_path} as {gridfs_filename}")
    print(f"GridFS ID: {file_id}")
    return file_id

async def main():
    client = motor.motor_asyncio.AsyncIOMotorClient()
    database = client.file_storage
    
    # Upload multiple files
    files_to_upload = [
        '/path/to/image.jpg',
        '/path/to/document.pdf',
        '/path/to/video.mp4'
    ]
    
    for file_path in files_to_upload:
        if os.path.exists(file_path):
            await upload_file_to_gridfs(database, file_path)
        else:
            print(f"File not found: {file_path}")
    
    client.close()

if __name__ == "__main__":
    asyncio.run(main())

Performance Optimization Example

import aiohttp.web
import motor.motor_asyncio
from motor.aiohttp import AIOHTTPGridFS
import asyncio
import time

# Optimized file retrieval with caching
class CachedGridFSHandler(AIOHTTPGridFS):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._file_cache = {}  # Simple in-memory cache
        self._cache_ttl = 300  # 5 minutes
    
    async def _get_cached_file(self, filename):
        """Check cache for file metadata."""
        if filename in self._file_cache:
            cached_data, timestamp = self._file_cache[filename]
            if time.time() - timestamp < self._cache_ttl:
                return cached_data
        
        return None
    
    async def _cache_file(self, filename, file_data):
        """Cache file metadata."""
        self._file_cache[filename] = (file_data, time.time())
    
    async def __call__(self, request):
        filename = request.match_info['filename']
        
        # Try cache first for metadata
        cached = await self._get_cached_file(filename)
        if cached and request.method == 'HEAD':
            # Return cached metadata for HEAD requests
            response = aiohttp.web.Response()
            response.headers.update(cached['headers'])
            return response
        
        # Fall back to parent implementation
        return await super().__call__(request)

# Usage with performance monitoring
async def create_optimized_app():
    client = motor.motor_asyncio.AsyncIOMotorClient(
        maxPoolSize=50,  # Increase connection pool
        minPoolSize=10
    )
    database = client.high_performance_files
    
    # Use cached handler
    gridfs_handler = CachedGridFSHandler(database)
    
    app = aiohttp.web.Application()
    
    # Add middleware for request timing
    async def timing_middleware(request, handler):
        start = time.time()
        response = await handler(request)
        duration = time.time() - start
        response.headers['X-Response-Time'] = f"{duration:.3f}s"
        return response
    
    app.middlewares.append(timing_middleware)
    
    # GridFS routes
    resource = app.router.add_resource("/files/{filename}")
    resource.add_route("GET", gridfs_handler)
    resource.add_route("HEAD", gridfs_handler)
    
    # Health check endpoint
    async def health_check(request):
        return aiohttp.web.json_response({"status": "healthy"})
    
    app.router.add_get('/health', health_check)
    
    return app

if __name__ == "__main__":
    app = create_optimized_app()
    aiohttp.web.run_app(app, host='localhost', port=8888)

Types

from typing import Any, Optional, Callable, Union
import tornado.web
import aiohttp.web
from datetime import datetime

# Handler types
GridFSHandlerType = tornado.web.RequestHandler
AIOHTTPHandlerType = Callable[[aiohttp.web.Request], aiohttp.web.Response]

# Customization function types
GetGridFSFileFunc = Union[
    Callable[[Any, str], Any],  # Tornado: (fs, filename) -> Future[MotorGridOut]
    Callable[[Any, str, aiohttp.web.Request], Any]  # aiohttp: (bucket, filename, request) -> AsyncIOMotorGridOut
]

GetCacheTimeFunc = Callable[[str, datetime, Optional[str]], int]  # (filename, modified, mime_type) -> seconds

SetExtraHeadersFunc = Union[
    Callable[[tornado.web.RequestHandler, Any], None],  # Tornado
    Callable[[aiohttp.web.Response, Any], None]  # aiohttp  
]

Install with Tessl CLI

npx tessl i tessl/pypi-motor

docs

asyncio-operations.md

change-streams.md

client-encryption.md

cursor-operations.md

gridfs-operations.md

index.md

tornado-operations.md

web-integration.md

tile.json