FastAPI framework, high performance, easy to learn, fast to code, ready for production - slim version without standard dependencies
—
FastAPI provides comprehensive file upload and handling capabilities through the UploadFile class and File parameter function. This enables secure file uploads, async file operations, and integration with form data for building file processing applications.
Async file upload handler providing access to uploaded files with metadata and streaming capabilities.
class UploadFile:
"""
File upload handler with async methods for file operations.
Provides access to uploaded file content, metadata, and streaming
operations while maintaining memory efficiency for large files.
Attributes (read-only):
- filename: Original filename from client (can be None)
- size: File size in bytes (available after reading)
- headers: HTTP headers from multipart upload
- content_type: MIME type of the uploaded file
"""
filename: Optional[str]
size: Optional[int]
headers: Headers
content_type: Optional[str]
file: BinaryIO # Underlying file-like object
async def read(self, size: int = -1) -> bytes:
"""
Read file content as bytes.
Parameters:
- size: Maximum number of bytes to read (-1 for entire file)
Returns:
File content as bytes
Note: File position advances after reading
"""
async def write(self, data: Union[str, bytes]) -> None:
"""
Write data to file.
Parameters:
- data: String or bytes to write to file
Note: Useful for processing or modifying uploaded files
"""
async def seek(self, offset: int) -> None:
"""
Move file pointer to specific position.
Parameters:
- offset: Position to seek to (0 = beginning)
Note: Allows re-reading file content or random access
"""
async def close(self) -> None:
"""
Close the file and release resources.
Note: Files are automatically closed when request completes
"""Function for declaring file upload parameters in endpoint functions.
def File(
default: Any = Undefined,
*,
media_type: str = "multipart/form-data",
alias: Optional[str] = None,
title: Optional[str] = None,
description: Optional[str] = None,
gt: Optional[float] = None,
ge: Optional[float] = None,
lt: Optional[float] = None,
le: Optional[float] = None,
min_length: Optional[int] = None,
max_length: Optional[int] = None,
regex: Optional[str] = None,
example: Any = Undefined,
examples: Optional[Dict[str, Any]] = None,
deprecated: Optional[bool] = None,
include_in_schema: bool = True,
json_schema_extra: Union[Dict[str, Any], Callable[[Dict[str, Any]], None], None] = None,
**extra: Any
) -> Any:
"""
Declare file upload parameter with validation constraints.
Parameters:
- default: Default value (use ... for required files)
- media_type: Expected media type (defaults to multipart/form-data)
- alias: Alternative parameter name in OpenAPI schema
- title: Parameter title for OpenAPI documentation
- description: Parameter description for OpenAPI documentation
- gt, ge, lt, le: Numeric validation for file size
- min_length, max_length: File size validation
- regex: Pattern for filename validation
- example: Example value for OpenAPI documentation
- examples: Dictionary of examples for OpenAPI documentation
- deprecated: Mark parameter as deprecated
- include_in_schema: Include in OpenAPI schema
- json_schema_extra: Additional JSON schema properties
Returns:
- bytes: When type hint is bytes
- UploadFile: When type hint is UploadFile
- List[UploadFile]: When type hint is List[UploadFile] for multiple files
"""from fastapi import FastAPI, File, UploadFile
from typing import Optional
app = FastAPI()
@app.post("/upload-file/")
async def upload_file(file: UploadFile = File(...)):
"""Upload single file as UploadFile object."""
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(await file.read())
}
@app.post("/upload-bytes/")
async def upload_bytes(file: bytes = File(...)):
"""Upload single file as bytes."""
return {"file_size": len(file)}from fastapi import FastAPI, File, UploadFile
from typing import List
app = FastAPI()
@app.post("/upload-multiple/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
"""Upload multiple files."""
results = []
for file in files:
content = await file.read()
results.append({
"filename": file.filename,
"content_type": file.content_type,
"size": len(content)
})
return {"files": results, "total_files": len(files)}
@app.post("/upload-optional-multiple/")
async def upload_optional_multiple(files: Optional[List[UploadFile]] = File(None)):
"""Upload multiple files (optional)."""
if not files:
return {"message": "No files uploaded"}
return {"files_uploaded": len(files)}from fastapi import FastAPI, File, UploadFile, HTTPException
import os
from typing import List
app = FastAPI()
# Configuration
MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
ALLOWED_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".pdf", ".txt"}
UPLOAD_DIR = "uploads"
# Ensure upload directory exists
os.makedirs(UPLOAD_DIR, exist_ok=True)
def validate_file(file: UploadFile) -> None:
"""Validate uploaded file."""
# Check file extension
if file.filename:
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext not in ALLOWED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File type {file_ext} not allowed. Allowed types: {ALLOWED_EXTENSIONS}"
)
# Check content type
allowed_content_types = {
"image/jpeg", "image/png", "image/gif",
"application/pdf", "text/plain"
}
if file.content_type not in allowed_content_types:
raise HTTPException(
status_code=400,
detail=f"Content type {file.content_type} not allowed"
)
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
# Validate file
validate_file(file)
# Read file content
content = await file.read()
# Check file size
if len(content) > MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File size {len(content)} exceeds maximum {MAX_FILE_SIZE} bytes"
)
# Save file
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as f:
f.write(content)
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(content),
"saved_path": file_path
}from fastapi import FastAPI, File, Form, UploadFile
from typing import Optional
app = FastAPI()
@app.post("/upload-with-metadata/")
async def upload_with_metadata(
title: str = Form(...),
description: Optional[str] = Form(None),
tags: str = Form(...),
file: UploadFile = File(...)
):
"""Upload file with additional form data."""
content = await file.read()
# Process metadata
tag_list = [tag.strip() for tag in tags.split(",")]
# Save file and metadata
file_record = {
"filename": file.filename,
"title": title,
"description": description,
"tags": tag_list,
"content_type": file.content_type,
"size": len(content),
"upload_date": "2023-01-01T00:00:00Z"
}
# Save to database (your logic here)
file_id = save_file_record(file_record, content)
return {"file_id": file_id, "metadata": file_record}
@app.post("/upload-multiple-with-metadata/")
async def upload_multiple_with_metadata(
category: str = Form(...),
files: List[UploadFile] = File(...)
):
"""Upload multiple files with shared metadata."""
results = []
for file in files:
content = await file.read()
file_record = {
"filename": file.filename,
"category": category,
"content_type": file.content_type,
"size": len(content)
}
results.append(file_record)
return {"category": category, "files": results, "total": len(files)}from fastapi import FastAPI, File, UploadFile, BackgroundTasks
import hashlib
import aiofiles
import os
app = FastAPI()
async def process_large_file_stream(file: UploadFile, output_path: str):
"""Process large file in chunks to avoid memory issues."""
hash_md5 = hashlib.md5()
total_size = 0
# Create output file
async with aiofiles.open(output_path, "wb") as output_file:
# Process file in chunks
while True:
chunk = await file.read(8192) # 8KB chunks
if not chunk:
break
# Update hash and size
hash_md5.update(chunk)
total_size += len(chunk)
# Write processed chunk (could apply transformations here)
await output_file.write(chunk)
return {
"total_size": total_size,
"md5_hash": hash_md5.hexdigest()
}
@app.post("/upload-large/")
async def upload_large_file(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
"""Handle large file upload with streaming."""
output_path = os.path.join("uploads", f"large_{file.filename}")
# Process file in streaming fashion
result = await process_large_file_stream(file, output_path)
# Add background task for further processing if needed
if background_tasks:
background_tasks.add_task(post_process_file, output_path)
return {
"filename": file.filename,
"saved_path": output_path,
"processing_result": result
}
def post_process_file(file_path: str):
"""Background task for additional file processing."""
print(f"Post-processing file: {file_path}")
# Additional processing logic (virus scan, format conversion, etc.)from fastapi import FastAPI, File, UploadFile, HTTPException
from PIL import Image
import io
import os
app = FastAPI()
def validate_image(file: UploadFile) -> None:
"""Validate that uploaded file is a valid image."""
if not file.content_type.startswith("image/"):
raise HTTPException(
status_code=400,
detail="File must be an image"
)
def resize_image(image_data: bytes, max_width: int = 800, max_height: int = 600) -> bytes:
"""Resize image while maintaining aspect ratio."""
image = Image.open(io.BytesIO(image_data))
# Calculate new dimensions
image.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
# Save resized image
output = io.BytesIO()
image.save(output, format=image.format)
return output.getvalue()
@app.post("/upload-image/")
async def upload_image(
file: UploadFile = File(...),
resize: bool = False,
max_width: int = 800,
max_height: int = 600
):
"""Upload and optionally resize image."""
validate_image(file)
# Read image data
image_data = await file.read()
# Get image info
image = Image.open(io.BytesIO(image_data))
original_size = image.size
# Resize if requested
if resize:
image_data = resize_image(image_data, max_width, max_height)
resized_image = Image.open(io.BytesIO(image_data))
new_size = resized_image.size
else:
new_size = original_size
# Save processed image
filename = f"processed_{file.filename}"
file_path = os.path.join("uploads", filename)
with open(file_path, "wb") as f:
f.write(image_data)
return {
"filename": filename,
"original_size": {"width": original_size[0], "height": original_size[1]},
"final_size": {"width": new_size[0], "height": new_size[1]},
"resized": resize,
"file_size": len(image_data),
"saved_path": file_path
}from fastapi import FastAPI, File, UploadFile, HTTPException
import csv
import io
from typing import List, Dict
app = FastAPI()
async def process_csv(file: UploadFile) -> List[Dict]:
"""Process uploaded CSV file."""
if file.content_type != "text/csv":
raise HTTPException(
status_code=400,
detail="File must be a CSV file"
)
# Read file content
content = await file.read()
# Decode and parse CSV
csv_data = content.decode("utf-8")
csv_reader = csv.DictReader(io.StringIO(csv_data))
records = []
for row_num, row in enumerate(csv_reader, start=2): # Start at 2 (header is row 1)
# Validate required fields
if not row.get("name"):
raise HTTPException(
status_code=400,
detail=f"Missing 'name' field in row {row_num}"
)
# Process row data
processed_row = {
"name": row["name"].strip(),
"email": row.get("email", "").strip(),
"age": int(row["age"]) if row.get("age") and row["age"].isdigit() else None,
"row_number": row_num
}
records.append(processed_row)
return records
@app.post("/upload-csv/")
async def upload_csv(file: UploadFile = File(...)):
"""Upload and process CSV file."""
try:
records = await process_csv(file)
# Save processed data (your logic here)
saved_count = save_csv_records(records)
return {
"filename": file.filename,
"total_records": len(records),
"saved_records": saved_count,
"sample_records": records[:3] # Show first 3 records
}
except Exception as e:
raise HTTPException(status_code=400, detail=f"CSV processing error: {str(e)}")from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse, StreamingResponse
import os
import mimetypes
app = FastAPI()
@app.get("/download/{filename}")
async def download_file(filename: str):
"""Download file by filename."""
file_path = os.path.join("uploads", filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
# Detect content type
content_type, _ = mimetypes.guess_type(file_path)
return FileResponse(
path=file_path,
filename=filename,
media_type=content_type
)
@app.get("/download-stream/{filename}")
async def download_file_stream(filename: str):
"""Download large file as stream."""
file_path = os.path.join("uploads", filename)
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="File not found")
def iter_file():
with open(file_path, "rb") as file:
while True:
chunk = file.read(8192) # 8KB chunks
if not chunk:
break
yield chunk
content_type, _ = mimetypes.guess_type(file_path)
return StreamingResponse(
iter_file(),
media_type=content_type,
headers={"Content-Disposition": f"attachment; filename={filename}"}
)from fastapi import FastAPI, File, UploadFile, BackgroundTasks
import aiofiles
import os
from typing import Dict
import uuid
app = FastAPI()
# Store upload progress
upload_progress: Dict[str, Dict] = {}
async def save_file_with_progress(
file: UploadFile,
file_path: str,
upload_id: str
):
"""Save file with progress tracking."""
total_size = 0
chunk_size = 8192
# Initialize progress
upload_progress[upload_id] = {
"filename": file.filename,
"total_bytes": 0,
"bytes_written": 0,
"status": "uploading",
"percentage": 0
}
async with aiofiles.open(file_path, "wb") as output_file:
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
await output_file.write(chunk)
total_size += len(chunk)
# Update progress
upload_progress[upload_id]["bytes_written"] = total_size
upload_progress[upload_id]["percentage"] = min(100, (total_size / (total_size + 1)) * 100)
# Mark as complete
upload_progress[upload_id]["status"] = "completed"
upload_progress[upload_id]["total_bytes"] = total_size
upload_progress[upload_id]["percentage"] = 100
@app.post("/upload-with-progress/")
async def upload_with_progress(
file: UploadFile = File(...),
background_tasks: BackgroundTasks = None
):
"""Upload file with progress tracking."""
upload_id = str(uuid.uuid4())
file_path = os.path.join("uploads", f"{upload_id}_{file.filename}")
# Start upload in background
background_tasks.add_task(
save_file_with_progress,
file,
file_path,
upload_id
)
return {
"upload_id": upload_id,
"filename": file.filename,
"status": "upload_started"
}
@app.get("/upload-progress/{upload_id}")
async def get_upload_progress(upload_id: str):
"""Get upload progress by ID."""
if upload_id not in upload_progress:
raise HTTPException(status_code=404, detail="Upload ID not found")
return upload_progress[upload_id]from fastapi import FastAPI, File, UploadFile, HTTPException, Depends
import hashlib
import os
import magic # python-magic library for MIME type detection
from typing import Set
app = FastAPI()
# Security configuration
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
ALLOWED_MIME_TYPES: Set[str] = {
"image/jpeg", "image/png", "image/gif",
"application/pdf", "text/plain"
}
BLOCKED_EXTENSIONS: Set[str] = {
".exe", ".bat", ".cmd", ".com", ".pif", ".scr", ".vbs", ".js"
}
def scan_file_security(file_content: bytes, filename: str) -> None:
"""Perform security checks on uploaded file."""
# Check file extension
file_ext = os.path.splitext(filename)[1].lower()
if file_ext in BLOCKED_EXTENSIONS:
raise HTTPException(
status_code=400,
detail=f"File extension {file_ext} is not allowed for security reasons"
)
# Detect actual MIME type from content
mime_type = magic.from_buffer(file_content, mime=True)
if mime_type not in ALLOWED_MIME_TYPES:
raise HTTPException(
status_code=400,
detail=f"File type {mime_type} is not allowed"
)
# Check for embedded executables or suspicious patterns
suspicious_patterns = [
b"MZ", # PE executable header
b"\x7fELF", # ELF executable header
b"<script", # JavaScript in uploads
b"<?php" # PHP code in uploads
]
for pattern in suspicious_patterns:
if pattern in file_content:
raise HTTPException(
status_code=400,
detail="File contains suspicious content"
)
def generate_secure_filename(original_filename: str) -> str:
"""Generate secure filename to prevent directory traversal."""
# Remove directory separators and dangerous characters
safe_chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789.-_"
filename = "".join(c for c in original_filename if c in safe_chars)
# Ensure filename is not empty and has reasonable length
if not filename or len(filename) > 255:
filename = f"upload_{hashlib.md5(original_filename.encode()).hexdigest()[:8]}"
return filename
@app.post("/secure-upload/")
async def secure_upload(
file: UploadFile = File(...),
current_user = Depends(get_current_user) # Require authentication
):
"""Secure file upload with comprehensive validation."""
# Read file content
content = await file.read()
# Check file size
if len(content) > MAX_FILE_SIZE:
raise HTTPException(
status_code=413,
detail=f"File size exceeds maximum allowed size of {MAX_FILE_SIZE} bytes"
)
# Perform security scans
scan_file_security(content, file.filename)
# Generate secure filename
secure_filename = generate_secure_filename(file.filename)
# Calculate file hash for integrity
file_hash = hashlib.sha256(content).hexdigest()
# Save file with secure path
user_dir = os.path.join("uploads", f"user_{current_user['id']}")
os.makedirs(user_dir, exist_ok=True)
file_path = os.path.join(user_dir, secure_filename)
with open(file_path, "wb") as f:
f.write(content)
# Log upload for audit
log_file_upload(current_user["id"], secure_filename, len(content), file_hash)
return {
"filename": secure_filename,
"original_filename": file.filename,
"size": len(content),
"hash": file_hash,
"content_type": file.content_type,
"saved_path": file_path
}Install with Tessl CLI
npx tessl i tessl/pypi-fastapi-slim