0
# Background Tasks
1
2
FastAPI's background task system allows execution of functions after the HTTP response is sent to the client. This enables operations like sending emails, logging, file processing, or cleanup tasks without making the client wait for these operations to complete.
3
4
## Capabilities
5
6
### BackgroundTasks Class
7
8
Container for managing multiple background tasks that execute after response completion.
9
10
```python { .api }
11
class BackgroundTasks:
12
"""
13
Container for background tasks to be executed after response is sent.
14
15
Background tasks run in the same process after the HTTP response
16
is delivered to the client, allowing for cleanup, logging, or
17
notification operations without blocking the response.
18
"""
19
20
def add_task(
21
self,
22
func: Callable[..., Any],
23
*args: Any,
24
**kwargs: Any
25
) -> None:
26
"""
27
Add a function to be executed as a background task.
28
29
Parameters:
30
- func: Function to execute (can be sync or async)
31
- args: Positional arguments to pass to the function
32
- kwargs: Keyword arguments to pass to the function
33
34
Behaviors:
35
- Tasks execute in the order they were added
36
- Async functions are awaited, sync functions run in thread pool
37
- Exceptions in background tasks are logged but don't affect response
38
- Tasks run after response is sent but before connection closes
39
- Multiple tasks can be added and will execute sequentially
40
"""
41
```
42
43
### Background Task Execution
44
45
Background tasks support both synchronous and asynchronous functions with automatic handling.
46
47
```python { .api }
48
# Sync background task function
49
def sync_background_task(param1: str, param2: int):
50
"""
51
Synchronous background task function.
52
53
Parameters:
54
- param1, param2: Function-specific parameters
55
56
Note: Sync functions run in thread pool to avoid blocking
57
"""
58
59
# Async background task function
60
async def async_background_task(param1: str, param2: int):
61
"""
62
Asynchronous background task function.
63
64
Parameters:
65
- param1, param2: Function-specific parameters
66
67
Note: Async functions are awaited directly
68
"""
69
```
70
71
## Usage Examples
72
73
### Basic Background Tasks
74
75
```python
76
from fastapi import FastAPI, BackgroundTasks
77
import logging
78
79
app = FastAPI()
80
81
# Set up logging
82
logging.basicConfig(level=logging.INFO)
83
logger = logging.getLogger(__name__)
84
85
def write_log(message: str, level: str = "info"):
86
"""Simple logging background task."""
87
if level == "info":
88
logger.info(message)
89
elif level == "error":
90
logger.error(message)
91
elif level == "warning":
92
logger.warning(message)
93
94
def send_notification(user_email: str, message: str):
95
"""Simulate sending email notification."""
96
print(f"Sending email to {user_email}: {message}")
97
# In real application, this would use an email service
98
# time.sleep(2) # Simulate email sending delay
99
100
@app.post("/items/")
101
async def create_item(item_data: dict, background_tasks: BackgroundTasks):
102
# Process the item creation
103
item_id = save_item(item_data) # Your item saving logic
104
105
# Add background tasks
106
background_tasks.add_task(
107
write_log,
108
f"Item {item_id} created successfully",
109
"info"
110
)
111
112
background_tasks.add_task(
113
send_notification,
114
"admin@example.com",
115
f"New item created: {item_data.get('name', 'Unknown')}"
116
)
117
118
# Response is sent immediately, tasks run after
119
return {"item_id": item_id, "status": "created"}
120
121
@app.delete("/items/{item_id}")
122
async def delete_item(item_id: int, background_tasks: BackgroundTasks):
123
# Delete the item
124
item_name = delete_item_from_db(item_id) # Your deletion logic
125
126
# Log deletion in background
127
background_tasks.add_task(
128
write_log,
129
f"Item {item_id} ({item_name}) deleted",
130
"info"
131
)
132
133
return {"message": "Item deleted"}
134
```
135
136
### Async Background Tasks
137
138
```python
139
from fastapi import FastAPI, BackgroundTasks
140
import asyncio
141
import aiohttp
142
import aiofiles
143
144
app = FastAPI()
145
146
async def async_log_to_file(message: str, filename: str = "app.log"):
147
"""Async background task for file logging."""
148
async with aiofiles.open(filename, mode="a") as file:
149
timestamp = datetime.now().isoformat()
150
await file.write(f"{timestamp}: {message}\n")
151
152
async def send_webhook(webhook_url: str, data: dict):
153
"""Async background task for sending webhook."""
154
async with aiohttp.ClientSession() as session:
155
try:
156
async with session.post(webhook_url, json=data) as response:
157
if response.status == 200:
158
print(f"Webhook sent successfully to {webhook_url}")
159
else:
160
print(f"Webhook failed: {response.status}")
161
except Exception as e:
162
print(f"Webhook error: {e}")
163
164
async def process_image_async(image_path: str, user_id: int):
165
"""Async background task for image processing."""
166
print(f"Starting image processing for user {user_id}")
167
168
# Simulate async image processing
169
await asyncio.sleep(2)
170
171
# Update database with processing result
172
await update_user_image_status(user_id, "processed")
173
print(f"Image processing completed for user {user_id}")
174
175
@app.post("/upload-image/")
176
async def upload_image(
177
image_data: dict,
178
user_id: int,
179
background_tasks: BackgroundTasks
180
):
181
# Save image immediately
182
image_path = save_image(image_data) # Your image saving logic
183
184
# Add async background tasks
185
background_tasks.add_task(
186
async_log_to_file,
187
f"Image uploaded by user {user_id}: {image_path}"
188
)
189
190
background_tasks.add_task(
191
process_image_async,
192
image_path,
193
user_id
194
)
195
196
background_tasks.add_task(
197
send_webhook,
198
"https://external-service.com/webhook",
199
{"event": "image_uploaded", "user_id": user_id, "image_path": image_path}
200
)
201
202
return {"message": "Image uploaded", "status": "processing"}
203
```
204
205
### Error Handling in Background Tasks
206
207
```python
208
from fastapi import FastAPI, BackgroundTasks
209
import logging
210
211
app = FastAPI()
212
logger = logging.getLogger(__name__)
213
214
def risky_background_task(data: dict):
215
"""Background task that might fail."""
216
try:
217
# Simulate some risky operation
218
if data.get("cause_error"):
219
raise ValueError("Simulated error in background task")
220
221
print(f"Processing data: {data}")
222
# Actual processing logic here
223
224
except Exception as e:
225
# Log the error (errors in background tasks don't affect response)
226
logger.error(f"Background task failed: {e}")
227
228
# Could also send error notification, update database, etc.
229
send_error_notification(str(e))
230
231
def send_error_notification(error_message: str):
232
"""Send notification about background task error."""
233
print(f"ERROR NOTIFICATION: {error_message}")
234
235
def cleanup_on_error(resource_id: str):
236
"""Cleanup resources if main task fails."""
237
print(f"Cleaning up resource: {resource_id}")
238
239
@app.post("/process-data/")
240
async def process_data(data: dict, background_tasks: BackgroundTasks):
241
resource_id = create_resource() # Your resource creation logic
242
243
try:
244
# Main processing
245
result = process_main_data(data) # Your main logic
246
247
# Add successful processing background task
248
background_tasks.add_task(risky_background_task, data)
249
250
return {"result": result, "resource_id": resource_id}
251
252
except Exception as e:
253
# If main processing fails, add cleanup background task
254
background_tasks.add_task(cleanup_on_error, resource_id)
255
raise # Re-raise to return error response
256
```
257
258
### Complex Background Task Workflows
259
260
```python
261
from fastapi import FastAPI, BackgroundTasks
262
from typing import List
263
import json
264
265
app = FastAPI()
266
267
def step1_validate_data(order_id: int, data: dict):
268
"""First step: validate order data."""
269
print(f"Step 1: Validating order {order_id}")
270
# Validation logic here
271
272
if not data.get("customer_id"):
273
raise ValueError("Customer ID is required")
274
275
print(f"Order {order_id} validation completed")
276
277
def step2_update_inventory(order_id: int, items: List[dict]):
278
"""Second step: update inventory."""
279
print(f"Step 2: Updating inventory for order {order_id}")
280
281
for item in items:
282
# Update inventory logic
283
print(f"Updated inventory for item {item['id']}: -{item['quantity']}")
284
285
print(f"Inventory update completed for order {order_id}")
286
287
def step3_send_confirmation(order_id: int, customer_email: str):
288
"""Third step: send confirmation email."""
289
print(f"Step 3: Sending confirmation for order {order_id} to {customer_email}")
290
291
# Email sending logic here
292
print(f"Confirmation email sent for order {order_id}")
293
294
def step4_notify_fulfillment(order_id: int, items: List[dict]):
295
"""Fourth step: notify fulfillment center."""
296
print(f"Step 4: Notifying fulfillment center for order {order_id}")
297
298
fulfillment_data = {
299
"order_id": order_id,
300
"items": items,
301
"priority": "standard"
302
}
303
304
# Send to fulfillment system
305
print(f"Fulfillment notification sent: {json.dumps(fulfillment_data)}")
306
307
@app.post("/orders/")
308
async def create_order(order_data: dict, background_tasks: BackgroundTasks):
309
# Create order immediately
310
order_id = create_order_record(order_data) # Your order creation logic
311
312
# Add sequential background tasks
313
# These will execute in the order they were added
314
background_tasks.add_task(
315
step1_validate_data,
316
order_id,
317
order_data
318
)
319
320
background_tasks.add_task(
321
step2_update_inventory,
322
order_id,
323
order_data.get("items", [])
324
)
325
326
background_tasks.add_task(
327
step3_send_confirmation,
328
order_id,
329
order_data.get("customer_email")
330
)
331
332
background_tasks.add_task(
333
step4_notify_fulfillment,
334
order_id,
335
order_data.get("items", [])
336
)
337
338
return {
339
"order_id": order_id,
340
"status": "created",
341
"message": "Order created, processing in background"
342
}
343
```
344
345
### Background Tasks with External Services
346
347
```python
348
from fastapi import FastAPI, BackgroundTasks
349
import requests
350
import time
351
from datetime import datetime
352
353
app = FastAPI()
354
355
def sync_with_external_service(user_id: int, user_data: dict):
356
"""Sync user data with external CRM system."""
357
crm_endpoint = "https://crm.example.com/api/users"
358
359
try:
360
response = requests.post(
361
f"{crm_endpoint}/{user_id}",
362
json=user_data,
363
timeout=30
364
)
365
366
if response.status_code == 200:
367
print(f"User {user_id} synced with CRM successfully")
368
log_sync_success(user_id)
369
else:
370
print(f"CRM sync failed for user {user_id}: {response.status_code}")
371
log_sync_failure(user_id, response.status_code)
372
373
except requests.RequestException as e:
374
print(f"CRM sync error for user {user_id}: {e}")
375
log_sync_failure(user_id, str(e))
376
377
def generate_report(report_type: str, user_id: int, filters: dict):
378
"""Generate report in background."""
379
print(f"Starting {report_type} report generation for user {user_id}")
380
381
# Simulate report generation
382
time.sleep(5) # This would be actual report processing
383
384
report_data = {
385
"type": report_type,
386
"user_id": user_id,
387
"filters": filters,
388
"generated_at": datetime.now().isoformat(),
389
"status": "completed"
390
}
391
392
# Save report to storage
393
report_id = save_report(report_data) # Your storage logic
394
395
# Notify user that report is ready
396
notify_report_ready(user_id, report_id)
397
398
print(f"Report {report_id} generated successfully")
399
400
def cleanup_temp_files(file_paths: List[str]):
401
"""Clean up temporary files."""
402
for file_path in file_paths:
403
try:
404
os.remove(file_path)
405
print(f"Removed temp file: {file_path}")
406
except OSError as e:
407
print(f"Failed to remove {file_path}: {e}")
408
409
@app.post("/users/{user_id}")
410
async def update_user(
411
user_id: int,
412
user_data: dict,
413
background_tasks: BackgroundTasks
414
):
415
# Update user in main database
416
updated_user = update_user_record(user_id, user_data) # Your update logic
417
418
# Sync with external services in background
419
background_tasks.add_task(
420
sync_with_external_service,
421
user_id,
422
user_data
423
)
424
425
return {"user": updated_user, "status": "updated"}
426
427
@app.post("/reports/generate")
428
async def request_report(
429
report_request: dict,
430
current_user_id: int,
431
background_tasks: BackgroundTasks
432
):
433
# Validate request immediately
434
if not report_request.get("type"):
435
raise HTTPException(status_code=400, detail="Report type is required")
436
437
# Start report generation in background
438
background_tasks.add_task(
439
generate_report,
440
report_request["type"],
441
current_user_id,
442
report_request.get("filters", {})
443
)
444
445
return {
446
"message": "Report generation started",
447
"status": "processing",
448
"estimated_time": "5-10 minutes"
449
}
450
451
@app.post("/files/process")
452
async def process_files(
453
file_data: dict,
454
background_tasks: BackgroundTasks
455
):
456
# Process files and create temp files
457
temp_files = create_temp_files(file_data) # Your file processing
458
processed_data = process_file_data(temp_files) # Your processing logic
459
460
# Clean up temp files in background
461
background_tasks.add_task(cleanup_temp_files, temp_files)
462
463
return {"processed_data": processed_data, "temp_files_count": len(temp_files)}
464
```
465
466
### Background Tasks with Database Operations
467
468
```python
469
from fastapi import FastAPI, BackgroundTasks, Depends
470
from sqlalchemy.orm import Session
471
472
app = FastAPI()
473
474
def update_user_stats(user_id: int, action: str, db_session: Session):
475
"""Update user statistics in background."""
476
try:
477
# Update user statistics
478
stats = db_session.query(UserStats).filter(UserStats.user_id == user_id).first()
479
480
if not stats:
481
stats = UserStats(user_id=user_id)
482
db_session.add(stats)
483
484
if action == "login":
485
stats.login_count += 1
486
stats.last_login = datetime.now()
487
elif action == "purchase":
488
stats.purchase_count += 1
489
stats.last_purchase = datetime.now()
490
491
db_session.commit()
492
print(f"Updated stats for user {user_id}: {action}")
493
494
except Exception as e:
495
db_session.rollback()
496
print(f"Failed to update stats for user {user_id}: {e}")
497
498
finally:
499
db_session.close()
500
501
def log_audit_event(user_id: int, event_type: str, details: dict, db_session: Session):
502
"""Log audit events in background."""
503
try:
504
audit_log = AuditLog(
505
user_id=user_id,
506
event_type=event_type,
507
details=json.dumps(details),
508
timestamp=datetime.now()
509
)
510
511
db_session.add(audit_log)
512
db_session.commit()
513
print(f"Logged audit event: {event_type} for user {user_id}")
514
515
except Exception as e:
516
db_session.rollback()
517
print(f"Failed to log audit event: {e}")
518
519
finally:
520
db_session.close()
521
522
@app.post("/login")
523
async def login(
524
credentials: dict,
525
background_tasks: BackgroundTasks,
526
db: Session = Depends(get_db)
527
):
528
# Authenticate user
529
user = authenticate_user(credentials) # Your auth logic
530
531
if not user:
532
raise HTTPException(status_code=401, detail="Invalid credentials")
533
534
# Create new database session for background task
535
# (the original session will be closed when request ends)
536
background_db = SessionLocal()
537
538
# Update login statistics in background
539
background_tasks.add_task(
540
update_user_stats,
541
user.id,
542
"login",
543
background_db
544
)
545
546
# Log login event in background
547
background_tasks.add_task(
548
log_audit_event,
549
user.id,
550
"login",
551
{"ip_address": request.client.host, "user_agent": request.headers.get("user-agent")},
552
SessionLocal() # Another new session
553
)
554
555
return {"access_token": create_access_token(user.id)}
556
```
557
558
### Testing Background Tasks
559
560
```python
561
from fastapi import FastAPI, BackgroundTasks
562
from fastapi.testclient import TestClient
563
import pytest
564
from unittest.mock import patch, Mock
565
566
app = FastAPI()
567
568
# Mock functions for testing
569
sent_emails = []
570
logged_messages = []
571
572
def mock_send_email(recipient: str, subject: str, body: str):
573
"""Mock email function for testing."""
574
sent_emails.append({
575
"recipient": recipient,
576
"subject": subject,
577
"body": body
578
})
579
580
def mock_log_message(message: str):
581
"""Mock logging function for testing."""
582
logged_messages.append(message)
583
584
@app.post("/test-endpoint")
585
async def test_endpoint(data: dict, background_tasks: BackgroundTasks):
586
background_tasks.add_task(
587
mock_send_email,
588
"test@example.com",
589
"Test Subject",
590
f"Test body: {data.get('message', '')}"
591
)
592
593
background_tasks.add_task(
594
mock_log_message,
595
f"Processed data: {data}"
596
)
597
598
return {"status": "success"}
599
600
def test_background_tasks():
601
client = TestClient(app)
602
603
# Clear mock data
604
sent_emails.clear()
605
logged_messages.clear()
606
607
# Make request
608
response = client.post("/test-endpoint", json={"message": "test"})
609
610
# Check response
611
assert response.status_code == 200
612
assert response.json() == {"status": "success"}
613
614
# Check background tasks were executed
615
# Note: In TestClient, background tasks run synchronously
616
assert len(sent_emails) == 1
617
assert sent_emails[0]["recipient"] == "test@example.com"
618
assert sent_emails[0]["subject"] == "Test Subject"
619
assert "test" in sent_emails[0]["body"]
620
621
assert len(logged_messages) == 1
622
assert "test" in logged_messages[0]
623
624
@patch('your_module.actual_send_email')
625
def test_background_tasks_with_mocks(mock_send_email):
626
"""Test background tasks with proper mocking."""
627
client = TestClient(app)
628
629
response = client.post("/test-endpoint", json={"message": "test"})
630
631
assert response.status_code == 200
632
633
# Verify the mock was called
634
mock_send_email.assert_called_once_with(
635
"test@example.com",
636
"Test Subject",
637
"Test body: test"
638
)
639
```