CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-fastapi-slim

FastAPI framework, high performance, easy to learn, fast to code, ready for production - slim version without standard dependencies

Pending
Overview
Eval results
Files

background-tasks.mddocs/

Background Tasks

FastAPI's background task system allows execution of functions after the HTTP response is sent to the client. This enables operations like sending emails, logging, file processing, or cleanup tasks without making the client wait for these operations to complete.

Capabilities

BackgroundTasks Class

Container for managing multiple background tasks that execute after response completion.

class BackgroundTasks:
    """
    Container for background tasks to be executed after response is sent.
    
    Background tasks run in the same process after the HTTP response
    is delivered to the client, allowing for cleanup, logging, or
    notification operations without blocking the response.
    """
    
    def add_task(
        self,
        func: Callable[..., Any],
        *args: Any,
        **kwargs: Any
    ) -> None:
        """
        Add a function to be executed as a background task.
        
        Parameters:
        - func: Function to execute (can be sync or async)
        - args: Positional arguments to pass to the function
        - kwargs: Keyword arguments to pass to the function
        
        Behaviors:
        - Tasks execute in the order they were added
        - Async functions are awaited, sync functions run in thread pool
        - Exceptions in background tasks are logged but don't affect response
        - Tasks run after response is sent but before connection closes
        - Multiple tasks can be added and will execute sequentially
        """

Background Task Execution

Background tasks support both synchronous and asynchronous functions with automatic handling.

# Sync background task function
def sync_background_task(param1: str, param2: int):
    """
    Synchronous background task function.
    
    Parameters:
    - param1, param2: Function-specific parameters
    
    Note: Sync functions run in thread pool to avoid blocking
    """

# Async background task function
async def async_background_task(param1: str, param2: int):
    """
    Asynchronous background task function.
    
    Parameters:
    - param1, param2: Function-specific parameters
    
    Note: Async functions are awaited directly
    """

Usage Examples

Basic Background Tasks

from fastapi import FastAPI, BackgroundTasks
import logging

app = FastAPI()

# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def write_log(message: str, level: str = "info"):
    """Simple logging background task."""
    if level == "info":
        logger.info(message)
    elif level == "error":
        logger.error(message)
    elif level == "warning":
        logger.warning(message)

def send_notification(user_email: str, message: str):
    """Simulate sending email notification."""
    print(f"Sending email to {user_email}: {message}")
    # In real application, this would use an email service
    # time.sleep(2)  # Simulate email sending delay

@app.post("/items/")
async def create_item(item_data: dict, background_tasks: BackgroundTasks):
    # Process the item creation
    item_id = save_item(item_data)  # Your item saving logic
    
    # Add background tasks
    background_tasks.add_task(
        write_log, 
        f"Item {item_id} created successfully",
        "info"
    )
    
    background_tasks.add_task(
        send_notification,
        "admin@example.com",
        f"New item created: {item_data.get('name', 'Unknown')}"
    )
    
    # Response is sent immediately, tasks run after
    return {"item_id": item_id, "status": "created"}

@app.delete("/items/{item_id}")
async def delete_item(item_id: int, background_tasks: BackgroundTasks):
    # Delete the item
    item_name = delete_item_from_db(item_id)  # Your deletion logic
    
    # Log deletion in background
    background_tasks.add_task(
        write_log,
        f"Item {item_id} ({item_name}) deleted",
        "info"
    )
    
    return {"message": "Item deleted"}

Async Background Tasks

from fastapi import FastAPI, BackgroundTasks
import asyncio
import aiohttp
import aiofiles

app = FastAPI()

async def async_log_to_file(message: str, filename: str = "app.log"):
    """Async background task for file logging."""
    async with aiofiles.open(filename, mode="a") as file:
        timestamp = datetime.now().isoformat()
        await file.write(f"{timestamp}: {message}\n")

async def send_webhook(webhook_url: str, data: dict):
    """Async background task for sending webhook."""
    async with aiohttp.ClientSession() as session:
        try:
            async with session.post(webhook_url, json=data) as response:
                if response.status == 200:
                    print(f"Webhook sent successfully to {webhook_url}")
                else:
                    print(f"Webhook failed: {response.status}")
        except Exception as e:
            print(f"Webhook error: {e}")

async def process_image_async(image_path: str, user_id: int):
    """Async background task for image processing."""
    print(f"Starting image processing for user {user_id}")
    
    # Simulate async image processing
    await asyncio.sleep(2)
    
    # Update database with processing result
    await update_user_image_status(user_id, "processed")
    print(f"Image processing completed for user {user_id}")

@app.post("/upload-image/")
async def upload_image(
    image_data: dict,
    user_id: int,
    background_tasks: BackgroundTasks
):
    # Save image immediately
    image_path = save_image(image_data)  # Your image saving logic
    
    # Add async background tasks
    background_tasks.add_task(
        async_log_to_file,
        f"Image uploaded by user {user_id}: {image_path}"
    )
    
    background_tasks.add_task(
        process_image_async,
        image_path,
        user_id
    )
    
    background_tasks.add_task(
        send_webhook,
        "https://external-service.com/webhook",
        {"event": "image_uploaded", "user_id": user_id, "image_path": image_path}
    )
    
    return {"message": "Image uploaded", "status": "processing"}

Error Handling in Background Tasks

from fastapi import FastAPI, BackgroundTasks
import logging

app = FastAPI()
logger = logging.getLogger(__name__)

def risky_background_task(data: dict):
    """Background task that might fail."""
    try:
        # Simulate some risky operation
        if data.get("cause_error"):
            raise ValueError("Simulated error in background task")
        
        print(f"Processing data: {data}")
        # Actual processing logic here
        
    except Exception as e:
        # Log the error (errors in background tasks don't affect response)
        logger.error(f"Background task failed: {e}")
        
        # Could also send error notification, update database, etc.
        send_error_notification(str(e))

def send_error_notification(error_message: str):
    """Send notification about background task error."""
    print(f"ERROR NOTIFICATION: {error_message}")

def cleanup_on_error(resource_id: str):
    """Cleanup resources if main task fails."""
    print(f"Cleaning up resource: {resource_id}")

@app.post("/process-data/")
async def process_data(data: dict, background_tasks: BackgroundTasks):
    resource_id = create_resource()  # Your resource creation logic
    
    try:
        # Main processing
        result = process_main_data(data)  # Your main logic
        
        # Add successful processing background task
        background_tasks.add_task(risky_background_task, data)
        
        return {"result": result, "resource_id": resource_id}
    
    except Exception as e:
        # If main processing fails, add cleanup background task
        background_tasks.add_task(cleanup_on_error, resource_id)
        raise  # Re-raise to return error response

Complex Background Task Workflows

from fastapi import FastAPI, BackgroundTasks
from typing import List
import json

app = FastAPI()

def step1_validate_data(order_id: int, data: dict):
    """First step: validate order data."""
    print(f"Step 1: Validating order {order_id}")
    # Validation logic here
    
    if not data.get("customer_id"):
        raise ValueError("Customer ID is required")
    
    print(f"Order {order_id} validation completed")

def step2_update_inventory(order_id: int, items: List[dict]):
    """Second step: update inventory."""
    print(f"Step 2: Updating inventory for order {order_id}")
    
    for item in items:
        # Update inventory logic
        print(f"Updated inventory for item {item['id']}: -{item['quantity']}")
    
    print(f"Inventory update completed for order {order_id}")

def step3_send_confirmation(order_id: int, customer_email: str):
    """Third step: send confirmation email."""
    print(f"Step 3: Sending confirmation for order {order_id} to {customer_email}")
    
    # Email sending logic here
    print(f"Confirmation email sent for order {order_id}")

def step4_notify_fulfillment(order_id: int, items: List[dict]):
    """Fourth step: notify fulfillment center."""
    print(f"Step 4: Notifying fulfillment center for order {order_id}")
    
    fulfillment_data = {
        "order_id": order_id,
        "items": items,
        "priority": "standard"
    }
    
    # Send to fulfillment system
    print(f"Fulfillment notification sent: {json.dumps(fulfillment_data)}")

@app.post("/orders/")
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
    # Create order immediately
    order_id = create_order_record(order_data)  # Your order creation logic
    
    # Add sequential background tasks
    # These will execute in the order they were added
    background_tasks.add_task(
        step1_validate_data,
        order_id,
        order_data
    )
    
    background_tasks.add_task(
        step2_update_inventory,
        order_id,
        order_data.get("items", [])
    )
    
    background_tasks.add_task(
        step3_send_confirmation,
        order_id,
        order_data.get("customer_email")
    )
    
    background_tasks.add_task(
        step4_notify_fulfillment,
        order_id,
        order_data.get("items", [])
    )
    
    return {
        "order_id": order_id,
        "status": "created",
        "message": "Order created, processing in background"
    }

Background Tasks with External Services

from fastapi import FastAPI, BackgroundTasks
import requests
import time
from datetime import datetime

app = FastAPI()

def sync_with_external_service(user_id: int, user_data: dict):
    """Sync user data with external CRM system."""
    crm_endpoint = "https://crm.example.com/api/users"
    
    try:
        response = requests.post(
            f"{crm_endpoint}/{user_id}",
            json=user_data,
            timeout=30
        )
        
        if response.status_code == 200:
            print(f"User {user_id} synced with CRM successfully")
            log_sync_success(user_id)
        else:
            print(f"CRM sync failed for user {user_id}: {response.status_code}")
            log_sync_failure(user_id, response.status_code)
    
    except requests.RequestException as e:
        print(f"CRM sync error for user {user_id}: {e}")
        log_sync_failure(user_id, str(e))

def generate_report(report_type: str, user_id: int, filters: dict):
    """Generate report in background."""
    print(f"Starting {report_type} report generation for user {user_id}")
    
    # Simulate report generation
    time.sleep(5)  # This would be actual report processing
    
    report_data = {
        "type": report_type,
        "user_id": user_id,
        "filters": filters,
        "generated_at": datetime.now().isoformat(),
        "status": "completed"
    }
    
    # Save report to storage
    report_id = save_report(report_data)  # Your storage logic
    
    # Notify user that report is ready
    notify_report_ready(user_id, report_id)
    
    print(f"Report {report_id} generated successfully")

def cleanup_temp_files(file_paths: List[str]):
    """Clean up temporary files."""
    for file_path in file_paths:
        try:
            os.remove(file_path)
            print(f"Removed temp file: {file_path}")
        except OSError as e:
            print(f"Failed to remove {file_path}: {e}")

@app.post("/users/{user_id}")
async def update_user(
    user_id: int,
    user_data: dict,
    background_tasks: BackgroundTasks
):
    # Update user in main database
    updated_user = update_user_record(user_id, user_data)  # Your update logic
    
    # Sync with external services in background
    background_tasks.add_task(
        sync_with_external_service,
        user_id,
        user_data
    )
    
    return {"user": updated_user, "status": "updated"}

@app.post("/reports/generate")
async def request_report(
    report_request: dict,
    current_user_id: int,
    background_tasks: BackgroundTasks
):
    # Validate request immediately
    if not report_request.get("type"):
        raise HTTPException(status_code=400, detail="Report type is required")
    
    # Start report generation in background
    background_tasks.add_task(
        generate_report,
        report_request["type"],
        current_user_id,
        report_request.get("filters", {})
    )
    
    return {
        "message": "Report generation started",
        "status": "processing",
        "estimated_time": "5-10 minutes"
    }

@app.post("/files/process")
async def process_files(
    file_data: dict,
    background_tasks: BackgroundTasks
):
    # Process files and create temp files
    temp_files = create_temp_files(file_data)  # Your file processing
    processed_data = process_file_data(temp_files)  # Your processing logic
    
    # Clean up temp files in background
    background_tasks.add_task(cleanup_temp_files, temp_files)
    
    return {"processed_data": processed_data, "temp_files_count": len(temp_files)}

Background Tasks with Database Operations

from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session

app = FastAPI()

def update_user_stats(user_id: int, action: str, db_session: Session):
    """Update user statistics in background."""
    try:
        # Update user statistics
        stats = db_session.query(UserStats).filter(UserStats.user_id == user_id).first()
        
        if not stats:
            stats = UserStats(user_id=user_id)
            db_session.add(stats)
        
        if action == "login":
            stats.login_count += 1
            stats.last_login = datetime.now()
        elif action == "purchase":
            stats.purchase_count += 1
            stats.last_purchase = datetime.now()
        
        db_session.commit()
        print(f"Updated stats for user {user_id}: {action}")
    
    except Exception as e:
        db_session.rollback()
        print(f"Failed to update stats for user {user_id}: {e}")
    
    finally:
        db_session.close()

def log_audit_event(user_id: int, event_type: str, details: dict, db_session: Session):
    """Log audit events in background."""
    try:
        audit_log = AuditLog(
            user_id=user_id,
            event_type=event_type,
            details=json.dumps(details),
            timestamp=datetime.now()
        )
        
        db_session.add(audit_log)
        db_session.commit()
        print(f"Logged audit event: {event_type} for user {user_id}")
    
    except Exception as e:
        db_session.rollback()
        print(f"Failed to log audit event: {e}")
    
    finally:
        db_session.close()

@app.post("/login")
async def login(
    credentials: dict,
    background_tasks: BackgroundTasks,
    db: Session = Depends(get_db)
):
    # Authenticate user
    user = authenticate_user(credentials)  # Your auth logic
    
    if not user:
        raise HTTPException(status_code=401, detail="Invalid credentials")
    
    # Create new database session for background task
    # (the original session will be closed when request ends)
    background_db = SessionLocal()
    
    # Update login statistics in background
    background_tasks.add_task(
        update_user_stats,
        user.id,
        "login",
        background_db
    )
    
    # Log login event in background
    background_tasks.add_task(
        log_audit_event,
        user.id,
        "login",
        {"ip_address": request.client.host, "user_agent": request.headers.get("user-agent")},
        SessionLocal()  # Another new session
    )
    
    return {"access_token": create_access_token(user.id)}

Testing Background Tasks

from fastapi import FastAPI, BackgroundTasks
from fastapi.testclient import TestClient
import pytest
from unittest.mock import patch, Mock

app = FastAPI()

# Mock functions for testing
sent_emails = []
logged_messages = []

def mock_send_email(recipient: str, subject: str, body: str):
    """Mock email function for testing."""
    sent_emails.append({
        "recipient": recipient,
        "subject": subject,
        "body": body
    })

def mock_log_message(message: str):
    """Mock logging function for testing."""
    logged_messages.append(message)

@app.post("/test-endpoint")
async def test_endpoint(data: dict, background_tasks: BackgroundTasks):
    background_tasks.add_task(
        mock_send_email,
        "test@example.com",
        "Test Subject",
        f"Test body: {data.get('message', '')}"
    )
    
    background_tasks.add_task(
        mock_log_message,
        f"Processed data: {data}"
    )
    
    return {"status": "success"}

def test_background_tasks():
    client = TestClient(app)
    
    # Clear mock data
    sent_emails.clear()
    logged_messages.clear()
    
    # Make request
    response = client.post("/test-endpoint", json={"message": "test"})
    
    # Check response
    assert response.status_code == 200
    assert response.json() == {"status": "success"}
    
    # Check background tasks were executed
    # Note: In TestClient, background tasks run synchronously
    assert len(sent_emails) == 1
    assert sent_emails[0]["recipient"] == "test@example.com"
    assert sent_emails[0]["subject"] == "Test Subject"
    assert "test" in sent_emails[0]["body"]
    
    assert len(logged_messages) == 1
    assert "test" in logged_messages[0]

@patch('your_module.actual_send_email')
def test_background_tasks_with_mocks(mock_send_email):
    """Test background tasks with proper mocking."""
    client = TestClient(app)
    
    response = client.post("/test-endpoint", json={"message": "test"})
    
    assert response.status_code == 200
    
    # Verify the mock was called
    mock_send_email.assert_called_once_with(
        "test@example.com",
        "Test Subject",
        "Test body: test"
    )

Install with Tessl CLI

npx tessl i tessl/pypi-fastapi-slim

docs

api-routing.md

background-tasks.md

core-application.md

dependency-injection.md

exception-handling.md

file-handling.md

index.md

parameter-declaration.md

request-response.md

websocket-support.md

tile.json