0
# Background Tasks
1
2
FastAPI provides built-in support for background tasks that execute after sending the HTTP response to the client. This allows you to perform time-consuming operations like sending emails, processing files, or updating databases without making the client wait for the response.
3
4
## Capabilities
5
6
### Background Tasks Class
7
8
Class for managing and executing background tasks after HTTP response completion.
9
10
```python { .api }
11
class BackgroundTasks:
12
def __init__(self, tasks: List[Task] = None) -> None:
13
"""
14
Background tasks container.
15
16
Parameters:
17
- tasks: Optional list of initial tasks to execute
18
"""
19
self.tasks = tasks or []
20
21
def add_task(
22
self,
23
func: Callable,
24
*args: Any,
25
**kwargs: Any
26
) -> None:
27
"""
28
Add background task to execute after response.
29
30
Parameters:
31
- func: Function to execute in background
32
- args: Positional arguments for the function
33
- kwargs: Keyword arguments for the function
34
"""
35
```
36
37
### Task Execution Interface
38
39
Functions can be added as background tasks with any signature and parameters.
40
41
```python { .api }
42
def background_task_function(*args: Any, **kwargs: Any) -> None:
43
"""
44
Background task function signature.
45
46
Parameters:
47
- args: Positional arguments passed from add_task
48
- kwargs: Keyword arguments passed from add_task
49
50
Note: Background tasks are executed synchronously after response
51
"""
52
53
async def async_background_task_function(*args: Any, **kwargs: Any) -> None:
54
"""
55
Async background task function signature.
56
57
Parameters:
58
- args: Positional arguments passed from add_task
59
- kwargs: Keyword arguments passed from add_task
60
61
Note: Async background tasks are awaited after response
62
"""
63
```
64
65
### Background Tasks Dependency
66
67
Background tasks can be injected as dependencies into route handlers.
68
69
```python { .api }
70
def route_handler(
71
background_tasks: BackgroundTasks,
72
# other parameters...
73
) -> Any:
74
"""
75
Route handler with background tasks dependency.
76
77
Parameters:
78
- background_tasks: BackgroundTasks instance for adding tasks
79
80
The BackgroundTasks instance is automatically provided by FastAPI
81
"""
82
```
83
84
## Usage Examples
85
86
### Basic Background Task
87
88
```python
89
from fastapi import FastAPI, BackgroundTasks
90
91
app = FastAPI()
92
93
def write_notification(email: str, message: str = ""):
94
with open("log.txt", mode="w") as email_file:
95
content = f"notification for {email}: {message}\n"
96
email_file.write(content)
97
98
@app.post("/send-notification/{email}")
99
async def send_notification(email: str, background_tasks: BackgroundTasks):
100
background_tasks.add_task(write_notification, email, message="some notification")
101
return {"message": "Notification sent in the background"}
102
```
103
104
### Multiple Background Tasks
105
106
```python
107
from fastapi import FastAPI, BackgroundTasks
108
import time
109
110
app = FastAPI()
111
112
def slow_task_1(name: str):
113
time.sleep(2)
114
print(f"Task 1 completed for {name}")
115
116
def slow_task_2(name: str):
117
time.sleep(3)
118
print(f"Task 2 completed for {name}")
119
120
def cleanup_task():
121
print("Cleanup completed")
122
123
@app.post("/process/{name}")
124
async def process_data(name: str, background_tasks: BackgroundTasks):
125
# Add multiple background tasks
126
background_tasks.add_task(slow_task_1, name)
127
background_tasks.add_task(slow_task_2, name)
128
background_tasks.add_task(cleanup_task)
129
130
return {"message": f"Processing started for {name}"}
131
```
132
133
### Background Task with Email Sending
134
135
```python
136
from fastapi import FastAPI, BackgroundTasks
137
import smtplib
138
from email.mime.text import MIMEText
139
from email.mime.multipart import MIMEMultipart
140
141
app = FastAPI()
142
143
def send_email(to_email: str, subject: str, body: str):
144
# Email configuration (use environment variables in production)
145
smtp_server = "smtp.gmail.com"
146
smtp_port = 587
147
sender_email = "your-email@gmail.com"
148
sender_password = "your-password"
149
150
try:
151
# Create message
152
message = MIMEMultipart()
153
message["From"] = sender_email
154
message["To"] = to_email
155
message["Subject"] = subject
156
157
message.attach(MIMEText(body, "plain"))
158
159
# Send email
160
with smtplib.SMTP(smtp_server, smtp_port) as server:
161
server.starttls()
162
server.login(sender_email, sender_password)
163
server.send_message(message)
164
165
print(f"Email sent successfully to {to_email}")
166
except Exception as e:
167
print(f"Failed to send email: {str(e)}")
168
169
@app.post("/send-email/")
170
async def send_email_endpoint(
171
to_email: str,
172
subject: str,
173
body: str,
174
background_tasks: BackgroundTasks
175
):
176
background_tasks.add_task(send_email, to_email, subject, body)
177
return {"message": "Email will be sent in the background"}
178
```
179
180
### Background Task with File Processing
181
182
```python
183
import os
184
import csv
185
from typing import List
186
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
187
188
app = FastAPI()
189
190
def process_csv_file(filename: str, user_id: int):
191
try:
192
with open(filename, 'r') as file:
193
csv_reader = csv.DictReader(file)
194
processed_rows = 0
195
196
for row in csv_reader:
197
# Process each row (simulate some work)
198
process_csv_row(row, user_id)
199
processed_rows += 1
200
201
# Clean up temporary file
202
os.remove(filename)
203
204
# Log completion
205
print(f"Processed {processed_rows} rows for user {user_id}")
206
207
# Notify user (in a real app, you might update a database or send a webhook)
208
notify_user_completion(user_id, processed_rows)
209
210
except Exception as e:
211
print(f"Error processing CSV: {str(e)}")
212
notify_user_error(user_id, str(e))
213
214
def process_csv_row(row: dict, user_id: int):
215
# Simulate row processing
216
print(f"Processing row for user {user_id}: {row}")
217
218
def notify_user_completion(user_id: int, row_count: int):
219
# In a real application, this might send a push notification or update a database
220
print(f"Notifying user {user_id}: processed {row_count} rows")
221
222
def notify_user_error(user_id: int, error_message: str):
223
print(f"Notifying user {user_id} of error: {error_message}")
224
225
@app.post("/upload-csv/{user_id}")
226
async def upload_csv(
227
user_id: int,
228
background_tasks: BackgroundTasks,
229
file: UploadFile = File(...)
230
):
231
# Save uploaded file temporarily
232
temp_filename = f"temp_{user_id}_{file.filename}"
233
234
with open(temp_filename, "wb") as temp_file:
235
content = await file.read()
236
temp_file.write(content)
237
238
# Process file in background
239
background_tasks.add_task(process_csv_file, temp_filename, user_id)
240
241
return {"message": f"File {file.filename} uploaded and will be processed in background"}
242
```
243
244
### Background Task with Database Operations
245
246
```python
247
from fastapi import FastAPI, BackgroundTasks
248
from sqlalchemy import create_engine, Column, Integer, String, DateTime
249
from sqlalchemy.ext.declarative import declarative_base
250
from sqlalchemy.orm import sessionmaker
251
from datetime import datetime
252
253
app = FastAPI()
254
255
# Database setup (simplified)
256
Base = declarative_base()
257
engine = create_engine("sqlite:///./test.db")
258
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
259
260
class ActivityLog(Base):
261
__tablename__ = "activity_logs"
262
263
id = Column(Integer, primary_key=True, index=True)
264
user_id = Column(Integer, index=True)
265
action = Column(String)
266
timestamp = Column(DateTime, default=datetime.utcnow)
267
268
Base.metadata.create_all(bind=engine)
269
270
def log_user_activity(user_id: int, action: str):
271
db = SessionLocal()
272
try:
273
activity = ActivityLog(user_id=user_id, action=action)
274
db.add(activity)
275
db.commit()
276
print(f"Logged activity: User {user_id} performed {action}")
277
except Exception as e:
278
print(f"Failed to log activity: {str(e)}")
279
db.rollback()
280
finally:
281
db.close()
282
283
def update_user_statistics(user_id: int):
284
db = SessionLocal()
285
try:
286
# Update user statistics based on recent activity
287
# This is a placeholder for complex statistical calculations
288
print(f"Updated statistics for user {user_id}")
289
except Exception as e:
290
print(f"Failed to update statistics: {str(e)}")
291
finally:
292
db.close()
293
294
@app.post("/user/{user_id}/action")
295
async def perform_user_action(
296
user_id: int,
297
action: str,
298
background_tasks: BackgroundTasks
299
):
300
# Log the activity in background
301
background_tasks.add_task(log_user_activity, user_id, action)
302
303
# Update user statistics in background
304
background_tasks.add_task(update_user_statistics, user_id)
305
306
return {"message": f"Action '{action}' recorded for user {user_id}"}
307
```
308
309
### Async Background Tasks
310
311
```python
312
import asyncio
313
import aiohttp
314
from fastapi import FastAPI, BackgroundTasks
315
316
app = FastAPI()
317
318
async def fetch_external_data(url: str, user_id: int):
319
try:
320
async with aiohttp.ClientSession() as session:
321
async with session.get(url) as response:
322
data = await response.json()
323
324
# Process the fetched data
325
await process_external_data(data, user_id)
326
327
print(f"Successfully processed external data for user {user_id}")
328
except Exception as e:
329
print(f"Failed to fetch external data: {str(e)}")
330
331
async def process_external_data(data: dict, user_id: int):
332
# Simulate async processing
333
await asyncio.sleep(1)
334
print(f"Processed data for user {user_id}: {len(data)} items")
335
336
async def send_webhook(webhook_url: str, payload: dict):
337
try:
338
async with aiohttp.ClientSession() as session:
339
async with session.post(webhook_url, json=payload) as response:
340
if response.status == 200:
341
print("Webhook sent successfully")
342
else:
343
print(f"Webhook failed with status {response.status}")
344
except Exception as e:
345
print(f"Webhook error: {str(e)}")
346
347
@app.post("/trigger-external-fetch/{user_id}")
348
async def trigger_external_fetch(
349
user_id: int,
350
data_url: str,
351
webhook_url: str,
352
background_tasks: BackgroundTasks
353
):
354
# Fetch external data in background
355
background_tasks.add_task(fetch_external_data, data_url, user_id)
356
357
# Send webhook notification in background
358
payload = {"user_id": user_id, "action": "external_fetch_triggered"}
359
background_tasks.add_task(send_webhook, webhook_url, payload)
360
361
return {"message": f"External data fetch triggered for user {user_id}"}
362
```
363
364
### Background Tasks with Error Handling
365
366
```python
367
import logging
368
from fastapi import FastAPI, BackgroundTasks
369
370
# Configure logging
371
logging.basicConfig(level=logging.INFO)
372
logger = logging.getLogger(__name__)
373
374
app = FastAPI()
375
376
def safe_background_task(task_name: str, *args, **kwargs):
377
"""Wrapper for background tasks with error handling"""
378
try:
379
logger.info(f"Starting background task: {task_name}")
380
381
# Determine which task to run based on task_name
382
if task_name == "send_notification":
383
send_notification_task(*args, **kwargs)
384
elif task_name == "process_data":
385
process_data_task(*args, **kwargs)
386
elif task_name == "cleanup":
387
cleanup_task(*args, **kwargs)
388
else:
389
raise ValueError(f"Unknown task: {task_name}")
390
391
logger.info(f"Completed background task: {task_name}")
392
393
except Exception as e:
394
logger.error(f"Background task {task_name} failed: {str(e)}")
395
# In a real app, you might want to retry, alert admins, etc.
396
397
def send_notification_task(user_id: int, message: str):
398
if not user_id:
399
raise ValueError("User ID is required")
400
print(f"Notification sent to user {user_id}: {message}")
401
402
def process_data_task(data_id: int):
403
if data_id <= 0:
404
raise ValueError("Invalid data ID")
405
print(f"Processed data {data_id}")
406
407
def cleanup_task():
408
print("Cleanup completed")
409
410
@app.post("/safe-task/{user_id}")
411
async def create_safe_task(user_id: int, message: str, background_tasks: BackgroundTasks):
412
# Use the safe wrapper for error handling
413
background_tasks.add_task(safe_background_task, "send_notification", user_id, message)
414
background_tasks.add_task(safe_background_task, "cleanup")
415
416
return {"message": "Tasks scheduled with error handling"}
417
```
418
419
### Background Tasks with Progress Tracking
420
421
```python
422
import time
423
from typing import Dict
424
from fastapi import FastAPI, BackgroundTasks
425
426
app = FastAPI()
427
428
# In-memory progress tracking (use Redis or database in production)
429
task_progress: Dict[str, dict] = {}
430
431
def long_running_task(task_id: str, items_count: int):
432
task_progress[task_id] = {
433
"status": "running",
434
"progress": 0,
435
"total": items_count,
436
"message": "Starting task..."
437
}
438
439
try:
440
for i in range(items_count):
441
# Simulate work
442
time.sleep(0.5)
443
444
# Update progress
445
task_progress[task_id].update({
446
"progress": i + 1,
447
"message": f"Processing item {i + 1} of {items_count}"
448
})
449
450
# Task completed
451
task_progress[task_id].update({
452
"status": "completed",
453
"message": "Task completed successfully"
454
})
455
456
except Exception as e:
457
task_progress[task_id].update({
458
"status": "failed",
459
"message": f"Task failed: {str(e)}"
460
})
461
462
@app.post("/start-task/{task_id}")
463
async def start_task(task_id: str, items_count: int, background_tasks: BackgroundTasks):
464
if task_id in task_progress:
465
return {"error": "Task with this ID already exists"}
466
467
background_tasks.add_task(long_running_task, task_id, items_count)
468
469
return {
470
"message": f"Task {task_id} started",
471
"task_id": task_id,
472
"check_progress_url": f"/task-progress/{task_id}"
473
}
474
475
@app.get("/task-progress/{task_id}")
476
async def get_task_progress(task_id: str):
477
if task_id not in task_progress:
478
return {"error": "Task not found"}
479
480
return task_progress[task_id]
481
```
482
483
### Background Tasks with Dependency Injection
484
485
```python
486
from fastapi import FastAPI, BackgroundTasks, Depends
487
488
app = FastAPI()
489
490
class EmailService:
491
def send_email(self, to: str, subject: str, body: str):
492
print(f"Sending email to {to}: {subject}")
493
494
class DatabaseService:
495
def log_activity(self, user_id: int, action: str):
496
print(f"Logging: User {user_id} performed {action}")
497
498
# Dependency providers
499
def get_email_service() -> EmailService:
500
return EmailService()
501
502
def get_database_service() -> DatabaseService:
503
return DatabaseService()
504
505
def notification_task(
506
user_id: int,
507
action: str,
508
email_service: EmailService,
509
db_service: DatabaseService
510
):
511
# Use injected services in background task
512
db_service.log_activity(user_id, action)
513
email_service.send_email(
514
f"user{user_id}@example.com",
515
"Action Performed",
516
f"You performed: {action}"
517
)
518
519
@app.post("/action/{user_id}")
520
async def perform_action(
521
user_id: int,
522
action: str,
523
background_tasks: BackgroundTasks,
524
email_service: EmailService = Depends(get_email_service),
525
db_service: DatabaseService = Depends(get_database_service)
526
):
527
# Pass dependencies to background task
528
background_tasks.add_task(
529
notification_task,
530
user_id,
531
action,
532
email_service,
533
db_service
534
)
535
536
return {"message": f"Action {action} performed for user {user_id}"}
537
```