FastAPI framework, high performance, easy to learn, fast to code, ready for production - slim version without standard dependencies
—
FastAPI's background task system allows execution of functions after the HTTP response is sent to the client. This enables operations like sending emails, logging, file processing, or cleanup tasks without making the client wait for these operations to complete.
Container for managing multiple background tasks that execute after response completion.
class BackgroundTasks:
"""
Container for background tasks to be executed after response is sent.
Background tasks run in the same process after the HTTP response
is delivered to the client, allowing for cleanup, logging, or
notification operations without blocking the response.
"""
def add_task(
self,
func: Callable[..., Any],
*args: Any,
**kwargs: Any
) -> None:
"""
Add a function to be executed as a background task.
Parameters:
- func: Function to execute (can be sync or async)
- args: Positional arguments to pass to the function
- kwargs: Keyword arguments to pass to the function
Behaviors:
- Tasks execute in the order they were added
- Async functions are awaited, sync functions run in thread pool
- Exceptions in background tasks are logged but don't affect response
- Tasks run after response is sent but before connection closes
- Multiple tasks can be added and will execute sequentially
"""Background tasks support both synchronous and asynchronous functions with automatic handling.
# Sync background task function
def sync_background_task(param1: str, param2: int):
"""
Synchronous background task function.
Parameters:
- param1, param2: Function-specific parameters
Note: Sync functions run in thread pool to avoid blocking
"""
# Async background task function
async def async_background_task(param1: str, param2: int):
"""
Asynchronous background task function.
Parameters:
- param1, param2: Function-specific parameters
Note: Async functions are awaited directly
"""from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def write_log(message: str, level: str = "info"):
"""Simple logging background task."""
if level == "info":
logger.info(message)
elif level == "error":
logger.error(message)
elif level == "warning":
logger.warning(message)
def send_notification(user_email: str, message: str):
"""Simulate sending email notification."""
print(f"Sending email to {user_email}: {message}")
# In real application, this would use an email service
# time.sleep(2) # Simulate email sending delay
@app.post("/items/")
async def create_item(item_data: dict, background_tasks: BackgroundTasks):
# Process the item creation
item_id = save_item(item_data) # Your item saving logic
# Add background tasks
background_tasks.add_task(
write_log,
f"Item {item_id} created successfully",
"info"
)
background_tasks.add_task(
send_notification,
"admin@example.com",
f"New item created: {item_data.get('name', 'Unknown')}"
)
# Response is sent immediately, tasks run after
return {"item_id": item_id, "status": "created"}
@app.delete("/items/{item_id}")
async def delete_item(item_id: int, background_tasks: BackgroundTasks):
# Delete the item
item_name = delete_item_from_db(item_id) # Your deletion logic
# Log deletion in background
background_tasks.add_task(
write_log,
f"Item {item_id} ({item_name}) deleted",
"info"
)
return {"message": "Item deleted"}from fastapi import FastAPI, BackgroundTasks
import asyncio
import aiohttp
import aiofiles
app = FastAPI()
async def async_log_to_file(message: str, filename: str = "app.log"):
"""Async background task for file logging."""
async with aiofiles.open(filename, mode="a") as file:
timestamp = datetime.now().isoformat()
await file.write(f"{timestamp}: {message}\n")
async def send_webhook(webhook_url: str, data: dict):
"""Async background task for sending webhook."""
async with aiohttp.ClientSession() as session:
try:
async with session.post(webhook_url, json=data) as response:
if response.status == 200:
print(f"Webhook sent successfully to {webhook_url}")
else:
print(f"Webhook failed: {response.status}")
except Exception as e:
print(f"Webhook error: {e}")
async def process_image_async(image_path: str, user_id: int):
"""Async background task for image processing."""
print(f"Starting image processing for user {user_id}")
# Simulate async image processing
await asyncio.sleep(2)
# Update database with processing result
await update_user_image_status(user_id, "processed")
print(f"Image processing completed for user {user_id}")
@app.post("/upload-image/")
async def upload_image(
image_data: dict,
user_id: int,
background_tasks: BackgroundTasks
):
# Save image immediately
image_path = save_image(image_data) # Your image saving logic
# Add async background tasks
background_tasks.add_task(
async_log_to_file,
f"Image uploaded by user {user_id}: {image_path}"
)
background_tasks.add_task(
process_image_async,
image_path,
user_id
)
background_tasks.add_task(
send_webhook,
"https://external-service.com/webhook",
{"event": "image_uploaded", "user_id": user_id, "image_path": image_path}
)
return {"message": "Image uploaded", "status": "processing"}from fastapi import FastAPI, BackgroundTasks
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
def risky_background_task(data: dict):
"""Background task that might fail."""
try:
# Simulate some risky operation
if data.get("cause_error"):
raise ValueError("Simulated error in background task")
print(f"Processing data: {data}")
# Actual processing logic here
except Exception as e:
# Log the error (errors in background tasks don't affect response)
logger.error(f"Background task failed: {e}")
# Could also send error notification, update database, etc.
send_error_notification(str(e))
def send_error_notification(error_message: str):
"""Send notification about background task error."""
print(f"ERROR NOTIFICATION: {error_message}")
def cleanup_on_error(resource_id: str):
"""Cleanup resources if main task fails."""
print(f"Cleaning up resource: {resource_id}")
@app.post("/process-data/")
async def process_data(data: dict, background_tasks: BackgroundTasks):
resource_id = create_resource() # Your resource creation logic
try:
# Main processing
result = process_main_data(data) # Your main logic
# Add successful processing background task
background_tasks.add_task(risky_background_task, data)
return {"result": result, "resource_id": resource_id}
except Exception as e:
# If main processing fails, add cleanup background task
background_tasks.add_task(cleanup_on_error, resource_id)
raise # Re-raise to return error responsefrom fastapi import FastAPI, BackgroundTasks
from typing import List
import json
app = FastAPI()
def step1_validate_data(order_id: int, data: dict):
"""First step: validate order data."""
print(f"Step 1: Validating order {order_id}")
# Validation logic here
if not data.get("customer_id"):
raise ValueError("Customer ID is required")
print(f"Order {order_id} validation completed")
def step2_update_inventory(order_id: int, items: List[dict]):
"""Second step: update inventory."""
print(f"Step 2: Updating inventory for order {order_id}")
for item in items:
# Update inventory logic
print(f"Updated inventory for item {item['id']}: -{item['quantity']}")
print(f"Inventory update completed for order {order_id}")
def step3_send_confirmation(order_id: int, customer_email: str):
"""Third step: send confirmation email."""
print(f"Step 3: Sending confirmation for order {order_id} to {customer_email}")
# Email sending logic here
print(f"Confirmation email sent for order {order_id}")
def step4_notify_fulfillment(order_id: int, items: List[dict]):
"""Fourth step: notify fulfillment center."""
print(f"Step 4: Notifying fulfillment center for order {order_id}")
fulfillment_data = {
"order_id": order_id,
"items": items,
"priority": "standard"
}
# Send to fulfillment system
print(f"Fulfillment notification sent: {json.dumps(fulfillment_data)}")
@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
# Create order immediately
order_id = create_order_record(order_data) # Your order creation logic
# Add sequential background tasks
# These will execute in the order they were added
background_tasks.add_task(
step1_validate_data,
order_id,
order_data
)
background_tasks.add_task(
step2_update_inventory,
order_id,
order_data.get("items", [])
)
background_tasks.add_task(
step3_send_confirmation,
order_id,
order_data.get("customer_email")
)
background_tasks.add_task(
step4_notify_fulfillment,
order_id,
order_data.get("items", [])
)
return {
"order_id": order_id,
"status": "created",
"message": "Order created, processing in background"
}from fastapi import FastAPI, BackgroundTasks
import requests
import time
from datetime import datetime
app = FastAPI()
def sync_with_external_service(user_id: int, user_data: dict):
"""Sync user data with external CRM system."""
crm_endpoint = "https://crm.example.com/api/users"
try:
response = requests.post(
f"{crm_endpoint}/{user_id}",
json=user_data,
timeout=30
)
if response.status_code == 200:
print(f"User {user_id} synced with CRM successfully")
log_sync_success(user_id)
else:
print(f"CRM sync failed for user {user_id}: {response.status_code}")
log_sync_failure(user_id, response.status_code)
except requests.RequestException as e:
print(f"CRM sync error for user {user_id}: {e}")
log_sync_failure(user_id, str(e))
def generate_report(report_type: str, user_id: int, filters: dict):
"""Generate report in background."""
print(f"Starting {report_type} report generation for user {user_id}")
# Simulate report generation
time.sleep(5) # This would be actual report processing
report_data = {
"type": report_type,
"user_id": user_id,
"filters": filters,
"generated_at": datetime.now().isoformat(),
"status": "completed"
}
# Save report to storage
report_id = save_report(report_data) # Your storage logic
# Notify user that report is ready
notify_report_ready(user_id, report_id)
print(f"Report {report_id} generated successfully")
def cleanup_temp_files(file_paths: List[str]):
"""Clean up temporary files."""
for file_path in file_paths:
try:
os.remove(file_path)
print(f"Removed temp file: {file_path}")
except OSError as e:
print(f"Failed to remove {file_path}: {e}")
@app.post("/users/{user_id}")
async def update_user(
user_id: int,
user_data: dict,
background_tasks: BackgroundTasks
):
# Update user in main database
updated_user = update_user_record(user_id, user_data) # Your update logic
# Sync with external services in background
background_tasks.add_task(
sync_with_external_service,
user_id,
user_data
)
return {"user": updated_user, "status": "updated"}
@app.post("/reports/generate")
async def request_report(
report_request: dict,
current_user_id: int,
background_tasks: BackgroundTasks
):
# Validate request immediately
if not report_request.get("type"):
raise HTTPException(status_code=400, detail="Report type is required")
# Start report generation in background
background_tasks.add_task(
generate_report,
report_request["type"],
current_user_id,
report_request.get("filters", {})
)
return {
"message": "Report generation started",
"status": "processing",
"estimated_time": "5-10 minutes"
}
@app.post("/files/process")
async def process_files(
file_data: dict,
background_tasks: BackgroundTasks
):
# Process files and create temp files
temp_files = create_temp_files(file_data) # Your file processing
processed_data = process_file_data(temp_files) # Your processing logic
# Clean up temp files in background
background_tasks.add_task(cleanup_temp_files, temp_files)
return {"processed_data": processed_data, "temp_files_count": len(temp_files)}from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
app = FastAPI()
def update_user_stats(user_id: int, action: str, db_session: Session):
"""Update user statistics in background."""
try:
# Update user statistics
stats = db_session.query(UserStats).filter(UserStats.user_id == user_id).first()
if not stats:
stats = UserStats(user_id=user_id)
db_session.add(stats)
if action == "login":
stats.login_count += 1
stats.last_login = datetime.now()
elif action == "purchase":
stats.purchase_count += 1
stats.last_purchase = datetime.now()
db_session.commit()
print(f"Updated stats for user {user_id}: {action}")
except Exception as e:
db_session.rollback()
print(f"Failed to update stats for user {user_id}: {e}")
finally:
db_session.close()
def log_audit_event(user_id: int, event_type: str, details: dict, db_session: Session):
"""Log audit events in background."""
try:
audit_log = AuditLog(
user_id=user_id,
event_type=event_type,
details=json.dumps(details),
timestamp=datetime.now()
)
db_session.add(audit_log)
db_session.commit()
print(f"Logged audit event: {event_type} for user {user_id}")
except Exception as e:
db_session.rollback()
print(f"Failed to log audit event: {e}")
finally:
db_session.close()
@app.post("/login")
async def login(
credentials: dict,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db)
):
# Authenticate user
user = authenticate_user(credentials) # Your auth logic
if not user:
raise HTTPException(status_code=401, detail="Invalid credentials")
# Create new database session for background task
# (the original session will be closed when request ends)
background_db = SessionLocal()
# Update login statistics in background
background_tasks.add_task(
update_user_stats,
user.id,
"login",
background_db
)
# Log login event in background
background_tasks.add_task(
log_audit_event,
user.id,
"login",
{"ip_address": request.client.host, "user_agent": request.headers.get("user-agent")},
SessionLocal() # Another new session
)
return {"access_token": create_access_token(user.id)}from fastapi import FastAPI, BackgroundTasks
from fastapi.testclient import TestClient
import pytest
from unittest.mock import patch, Mock
app = FastAPI()
# Mock functions for testing
sent_emails = []
logged_messages = []
def mock_send_email(recipient: str, subject: str, body: str):
"""Mock email function for testing."""
sent_emails.append({
"recipient": recipient,
"subject": subject,
"body": body
})
def mock_log_message(message: str):
"""Mock logging function for testing."""
logged_messages.append(message)
@app.post("/test-endpoint")
async def test_endpoint(data: dict, background_tasks: BackgroundTasks):
background_tasks.add_task(
mock_send_email,
"test@example.com",
"Test Subject",
f"Test body: {data.get('message', '')}"
)
background_tasks.add_task(
mock_log_message,
f"Processed data: {data}"
)
return {"status": "success"}
def test_background_tasks():
client = TestClient(app)
# Clear mock data
sent_emails.clear()
logged_messages.clear()
# Make request
response = client.post("/test-endpoint", json={"message": "test"})
# Check response
assert response.status_code == 200
assert response.json() == {"status": "success"}
# Check background tasks were executed
# Note: In TestClient, background tasks run synchronously
assert len(sent_emails) == 1
assert sent_emails[0]["recipient"] == "test@example.com"
assert sent_emails[0]["subject"] == "Test Subject"
assert "test" in sent_emails[0]["body"]
assert len(logged_messages) == 1
assert "test" in logged_messages[0]
@patch('your_module.actual_send_email')
def test_background_tasks_with_mocks(mock_send_email):
"""Test background tasks with proper mocking."""
client = TestClient(app)
response = client.post("/test-endpoint", json={"message": "test"})
assert response.status_code == 200
# Verify the mock was called
mock_send_email.assert_called_once_with(
"test@example.com",
"Test Subject",
"Test body: test"
)Install with Tessl CLI
npx tessl i tessl/pypi-fastapi-slim