- Spec files
pypi-fastapi
Describes: pkg:pypi/fastapi@0.116.x
- Description
- FastAPI framework, high performance, easy to learn, fast to code, ready for production
- Author
- tessl
- Last updated
background-tasks.md docs/
1# Background Tasks23FastAPI provides built-in support for background tasks that execute after sending the HTTP response to the client. This allows you to perform time-consuming operations like sending emails, processing files, or updating databases without making the client wait for the response.45## Capabilities67### Background Tasks Class89Class for managing and executing background tasks after HTTP response completion.1011```python { .api }12class BackgroundTasks:13def __init__(self, tasks: List[Task] = None) -> None:14"""15Background tasks container.1617Parameters:18- tasks: Optional list of initial tasks to execute19"""20self.tasks = tasks or []2122def add_task(23self,24func: Callable,25*args: Any,26**kwargs: Any27) -> None:28"""29Add background task to execute after response.3031Parameters:32- func: Function to execute in background33- args: Positional arguments for the function34- kwargs: Keyword arguments for the function35"""36```3738### Task Execution Interface3940Functions can be added as background tasks with any signature and parameters.4142```python { .api }43def background_task_function(*args: Any, **kwargs: Any) -> None:44"""45Background task function signature.4647Parameters:48- args: Positional arguments passed from add_task49- kwargs: Keyword arguments passed from add_task5051Note: Background tasks are executed synchronously after response52"""5354async def async_background_task_function(*args: Any, **kwargs: Any) -> None:55"""56Async background task function signature.5758Parameters:59- args: Positional arguments passed from add_task60- kwargs: Keyword arguments passed from add_task6162Note: Async background tasks are awaited after response63"""64```6566### Background Tasks Dependency6768Background tasks can be injected as dependencies into route handlers.6970```python { .api }71def route_handler(72background_tasks: BackgroundTasks,73# other parameters...74) -> Any:75"""76Route handler with background tasks dependency.7778Parameters:79- background_tasks: BackgroundTasks instance for adding tasks8081The BackgroundTasks instance is automatically provided by FastAPI82"""83```8485## Usage Examples8687### Basic Background Task8889```python90from fastapi import FastAPI, BackgroundTasks9192app = FastAPI()9394def write_notification(email: str, message: str = ""):95with open("log.txt", mode="w") as email_file:96content = f"notification for {email}: {message}\n"97email_file.write(content)9899@app.post("/send-notification/{email}")100async def send_notification(email: str, background_tasks: BackgroundTasks):101background_tasks.add_task(write_notification, email, message="some notification")102return {"message": "Notification sent in the background"}103```104105### Multiple Background Tasks106107```python108from fastapi import FastAPI, BackgroundTasks109import time110111app = FastAPI()112113def slow_task_1(name: str):114time.sleep(2)115print(f"Task 1 completed for {name}")116117def slow_task_2(name: str):118time.sleep(3)119print(f"Task 2 completed for {name}")120121def cleanup_task():122print("Cleanup completed")123124@app.post("/process/{name}")125async def process_data(name: str, background_tasks: BackgroundTasks):126# Add multiple background tasks127background_tasks.add_task(slow_task_1, name)128background_tasks.add_task(slow_task_2, name)129background_tasks.add_task(cleanup_task)130131return {"message": f"Processing started for {name}"}132```133134### Background Task with Email Sending135136```python137from fastapi import FastAPI, BackgroundTasks138import smtplib139from email.mime.text import MIMEText140from email.mime.multipart import MIMEMultipart141142app = FastAPI()143144def send_email(to_email: str, subject: str, body: str):145# Email configuration (use environment variables in production)146smtp_server = "smtp.gmail.com"147smtp_port = 587148sender_email = "your-email@gmail.com"149sender_password = "your-password"150151try:152# Create message153message = MIMEMultipart()154message["From"] = sender_email155message["To"] = to_email156message["Subject"] = subject157158message.attach(MIMEText(body, "plain"))159160# Send email161with smtplib.SMTP(smtp_server, smtp_port) as server:162server.starttls()163server.login(sender_email, sender_password)164server.send_message(message)165166print(f"Email sent successfully to {to_email}")167except Exception as e:168print(f"Failed to send email: {str(e)}")169170@app.post("/send-email/")171async def send_email_endpoint(172to_email: str,173subject: str,174body: str,175background_tasks: BackgroundTasks176):177background_tasks.add_task(send_email, to_email, subject, body)178return {"message": "Email will be sent in the background"}179```180181### Background Task with File Processing182183```python184import os185import csv186from typing import List187from fastapi import FastAPI, BackgroundTasks, UploadFile, File188189app = FastAPI()190191def process_csv_file(filename: str, user_id: int):192try:193with open(filename, 'r') as file:194csv_reader = csv.DictReader(file)195processed_rows = 0196197for row in csv_reader:198# Process each row (simulate some work)199process_csv_row(row, user_id)200processed_rows += 1201202# Clean up temporary file203os.remove(filename)204205# Log completion206print(f"Processed {processed_rows} rows for user {user_id}")207208# Notify user (in a real app, you might update a database or send a webhook)209notify_user_completion(user_id, processed_rows)210211except Exception as e:212print(f"Error processing CSV: {str(e)}")213notify_user_error(user_id, str(e))214215def process_csv_row(row: dict, user_id: int):216# Simulate row processing217print(f"Processing row for user {user_id}: {row}")218219def notify_user_completion(user_id: int, row_count: int):220# In a real application, this might send a push notification or update a database221print(f"Notifying user {user_id}: processed {row_count} rows")222223def notify_user_error(user_id: int, error_message: str):224print(f"Notifying user {user_id} of error: {error_message}")225226@app.post("/upload-csv/{user_id}")227async def upload_csv(228user_id: int,229background_tasks: BackgroundTasks,230file: UploadFile = File(...)231):232# Save uploaded file temporarily233temp_filename = f"temp_{user_id}_{file.filename}"234235with open(temp_filename, "wb") as temp_file:236content = await file.read()237temp_file.write(content)238239# Process file in background240background_tasks.add_task(process_csv_file, temp_filename, user_id)241242return {"message": f"File {file.filename} uploaded and will be processed in background"}243```244245### Background Task with Database Operations246247```python248from fastapi import FastAPI, BackgroundTasks249from sqlalchemy import create_engine, Column, Integer, String, DateTime250from sqlalchemy.ext.declarative import declarative_base251from sqlalchemy.orm import sessionmaker252from datetime import datetime253254app = FastAPI()255256# Database setup (simplified)257Base = declarative_base()258engine = create_engine("sqlite:///./test.db")259SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)260261class ActivityLog(Base):262__tablename__ = "activity_logs"263264id = Column(Integer, primary_key=True, index=True)265user_id = Column(Integer, index=True)266action = Column(String)267timestamp = Column(DateTime, default=datetime.utcnow)268269Base.metadata.create_all(bind=engine)270271def log_user_activity(user_id: int, action: str):272db = SessionLocal()273try:274activity = ActivityLog(user_id=user_id, action=action)275db.add(activity)276db.commit()277print(f"Logged activity: User {user_id} performed {action}")278except Exception as e:279print(f"Failed to log activity: {str(e)}")280db.rollback()281finally:282db.close()283284def update_user_statistics(user_id: int):285db = SessionLocal()286try:287# Update user statistics based on recent activity288# This is a placeholder for complex statistical calculations289print(f"Updated statistics for user {user_id}")290except Exception as e:291print(f"Failed to update statistics: {str(e)}")292finally:293db.close()294295@app.post("/user/{user_id}/action")296async def perform_user_action(297user_id: int,298action: str,299background_tasks: BackgroundTasks300):301# Log the activity in background302background_tasks.add_task(log_user_activity, user_id, action)303304# Update user statistics in background305background_tasks.add_task(update_user_statistics, user_id)306307return {"message": f"Action '{action}' recorded for user {user_id}"}308```309310### Async Background Tasks311312```python313import asyncio314import aiohttp315from fastapi import FastAPI, BackgroundTasks316317app = FastAPI()318319async def fetch_external_data(url: str, user_id: int):320try:321async with aiohttp.ClientSession() as session:322async with session.get(url) as response:323data = await response.json()324325# Process the fetched data326await process_external_data(data, user_id)327328print(f"Successfully processed external data for user {user_id}")329except Exception as e:330print(f"Failed to fetch external data: {str(e)}")331332async def process_external_data(data: dict, user_id: int):333# Simulate async processing334await asyncio.sleep(1)335print(f"Processed data for user {user_id}: {len(data)} items")336337async def send_webhook(webhook_url: str, payload: dict):338try:339async with aiohttp.ClientSession() as session:340async with session.post(webhook_url, json=payload) as response:341if response.status == 200:342print("Webhook sent successfully")343else:344print(f"Webhook failed with status {response.status}")345except Exception as e:346print(f"Webhook error: {str(e)}")347348@app.post("/trigger-external-fetch/{user_id}")349async def trigger_external_fetch(350user_id: int,351data_url: str,352webhook_url: str,353background_tasks: BackgroundTasks354):355# Fetch external data in background356background_tasks.add_task(fetch_external_data, data_url, user_id)357358# Send webhook notification in background359payload = {"user_id": user_id, "action": "external_fetch_triggered"}360background_tasks.add_task(send_webhook, webhook_url, payload)361362return {"message": f"External data fetch triggered for user {user_id}"}363```364365### Background Tasks with Error Handling366367```python368import logging369from fastapi import FastAPI, BackgroundTasks370371# Configure logging372logging.basicConfig(level=logging.INFO)373logger = logging.getLogger(__name__)374375app = FastAPI()376377def safe_background_task(task_name: str, *args, **kwargs):378"""Wrapper for background tasks with error handling"""379try:380logger.info(f"Starting background task: {task_name}")381382# Determine which task to run based on task_name383if task_name == "send_notification":384send_notification_task(*args, **kwargs)385elif task_name == "process_data":386process_data_task(*args, **kwargs)387elif task_name == "cleanup":388cleanup_task(*args, **kwargs)389else:390raise ValueError(f"Unknown task: {task_name}")391392logger.info(f"Completed background task: {task_name}")393394except Exception as e:395logger.error(f"Background task {task_name} failed: {str(e)}")396# In a real app, you might want to retry, alert admins, etc.397398def send_notification_task(user_id: int, message: str):399if not user_id:400raise ValueError("User ID is required")401print(f"Notification sent to user {user_id}: {message}")402403def process_data_task(data_id: int):404if data_id <= 0:405raise ValueError("Invalid data ID")406print(f"Processed data {data_id}")407408def cleanup_task():409print("Cleanup completed")410411@app.post("/safe-task/{user_id}")412async def create_safe_task(user_id: int, message: str, background_tasks: BackgroundTasks):413# Use the safe wrapper for error handling414background_tasks.add_task(safe_background_task, "send_notification", user_id, message)415background_tasks.add_task(safe_background_task, "cleanup")416417return {"message": "Tasks scheduled with error handling"}418```419420### Background Tasks with Progress Tracking421422```python423import time424from typing import Dict425from fastapi import FastAPI, BackgroundTasks426427app = FastAPI()428429# In-memory progress tracking (use Redis or database in production)430task_progress: Dict[str, dict] = {}431432def long_running_task(task_id: str, items_count: int):433task_progress[task_id] = {434"status": "running",435"progress": 0,436"total": items_count,437"message": "Starting task..."438}439440try:441for i in range(items_count):442# Simulate work443time.sleep(0.5)444445# Update progress446task_progress[task_id].update({447"progress": i + 1,448"message": f"Processing item {i + 1} of {items_count}"449})450451# Task completed452task_progress[task_id].update({453"status": "completed",454"message": "Task completed successfully"455})456457except Exception as e:458task_progress[task_id].update({459"status": "failed",460"message": f"Task failed: {str(e)}"461})462463@app.post("/start-task/{task_id}")464async def start_task(task_id: str, items_count: int, background_tasks: BackgroundTasks):465if task_id in task_progress:466return {"error": "Task with this ID already exists"}467468background_tasks.add_task(long_running_task, task_id, items_count)469470return {471"message": f"Task {task_id} started",472"task_id": task_id,473"check_progress_url": f"/task-progress/{task_id}"474}475476@app.get("/task-progress/{task_id}")477async def get_task_progress(task_id: str):478if task_id not in task_progress:479return {"error": "Task not found"}480481return task_progress[task_id]482```483484### Background Tasks with Dependency Injection485486```python487from fastapi import FastAPI, BackgroundTasks, Depends488489app = FastAPI()490491class EmailService:492def send_email(self, to: str, subject: str, body: str):493print(f"Sending email to {to}: {subject}")494495class DatabaseService:496def log_activity(self, user_id: int, action: str):497print(f"Logging: User {user_id} performed {action}")498499# Dependency providers500def get_email_service() -> EmailService:501return EmailService()502503def get_database_service() -> DatabaseService:504return DatabaseService()505506def notification_task(507user_id: int,508action: str,509email_service: EmailService,510db_service: DatabaseService511):512# Use injected services in background task513db_service.log_activity(user_id, action)514email_service.send_email(515f"user{user_id}@example.com",516"Action Performed",517f"You performed: {action}"518)519520@app.post("/action/{user_id}")521async def perform_action(522user_id: int,523action: str,524background_tasks: BackgroundTasks,525email_service: EmailService = Depends(get_email_service),526db_service: DatabaseService = Depends(get_database_service)527):528# Pass dependencies to background task529background_tasks.add_task(530notification_task,531user_id,532action,533email_service,534db_service535)536537return {"message": f"Action {action} performed for user {user_id}"}538```