pypi-fastapi

Description
FastAPI framework, high performance, easy to learn, fast to code, ready for production
Author
tessl
Last updated

How to use

npx @tessl/cli registry install tessl/pypi-fastapi@0.116.0

background-tasks.md docs/

1
# Background Tasks
2
3
FastAPI provides built-in support for background tasks that execute after sending the HTTP response to the client. This allows you to perform time-consuming operations like sending emails, processing files, or updating databases without making the client wait for the response.
4
5
## Capabilities
6
7
### Background Tasks Class
8
9
Class for managing and executing background tasks after HTTP response completion.
10
11
```python { .api }
12
class BackgroundTasks:
13
def __init__(self, tasks: List[Task] = None) -> None:
14
"""
15
Background tasks container.
16
17
Parameters:
18
- tasks: Optional list of initial tasks to execute
19
"""
20
self.tasks = tasks or []
21
22
def add_task(
23
self,
24
func: Callable,
25
*args: Any,
26
**kwargs: Any
27
) -> None:
28
"""
29
Add background task to execute after response.
30
31
Parameters:
32
- func: Function to execute in background
33
- args: Positional arguments for the function
34
- kwargs: Keyword arguments for the function
35
"""
36
```
37
38
### Task Execution Interface
39
40
Functions can be added as background tasks with any signature and parameters.
41
42
```python { .api }
43
def background_task_function(*args: Any, **kwargs: Any) -> None:
44
"""
45
Background task function signature.
46
47
Parameters:
48
- args: Positional arguments passed from add_task
49
- kwargs: Keyword arguments passed from add_task
50
51
Note: Background tasks are executed synchronously after response
52
"""
53
54
async def async_background_task_function(*args: Any, **kwargs: Any) -> None:
55
"""
56
Async background task function signature.
57
58
Parameters:
59
- args: Positional arguments passed from add_task
60
- kwargs: Keyword arguments passed from add_task
61
62
Note: Async background tasks are awaited after response
63
"""
64
```
65
66
### Background Tasks Dependency
67
68
Background tasks can be injected as dependencies into route handlers.
69
70
```python { .api }
71
def route_handler(
72
background_tasks: BackgroundTasks,
73
# other parameters...
74
) -> Any:
75
"""
76
Route handler with background tasks dependency.
77
78
Parameters:
79
- background_tasks: BackgroundTasks instance for adding tasks
80
81
The BackgroundTasks instance is automatically provided by FastAPI
82
"""
83
```
84
85
## Usage Examples
86
87
### Basic Background Task
88
89
```python
90
from fastapi import FastAPI, BackgroundTasks
91
92
app = FastAPI()
93
94
def write_notification(email: str, message: str = ""):
95
with open("log.txt", mode="w") as email_file:
96
content = f"notification for {email}: {message}\n"
97
email_file.write(content)
98
99
@app.post("/send-notification/{email}")
100
async def send_notification(email: str, background_tasks: BackgroundTasks):
101
background_tasks.add_task(write_notification, email, message="some notification")
102
return {"message": "Notification sent in the background"}
103
```
104
105
### Multiple Background Tasks
106
107
```python
108
from fastapi import FastAPI, BackgroundTasks
109
import time
110
111
app = FastAPI()
112
113
def slow_task_1(name: str):
114
time.sleep(2)
115
print(f"Task 1 completed for {name}")
116
117
def slow_task_2(name: str):
118
time.sleep(3)
119
print(f"Task 2 completed for {name}")
120
121
def cleanup_task():
122
print("Cleanup completed")
123
124
@app.post("/process/{name}")
125
async def process_data(name: str, background_tasks: BackgroundTasks):
126
# Add multiple background tasks
127
background_tasks.add_task(slow_task_1, name)
128
background_tasks.add_task(slow_task_2, name)
129
background_tasks.add_task(cleanup_task)
130
131
return {"message": f"Processing started for {name}"}
132
```
133
134
### Background Task with Email Sending
135
136
```python
137
from fastapi import FastAPI, BackgroundTasks
138
import smtplib
139
from email.mime.text import MIMEText
140
from email.mime.multipart import MIMEMultipart
141
142
app = FastAPI()
143
144
def send_email(to_email: str, subject: str, body: str):
145
# Email configuration (use environment variables in production)
146
smtp_server = "smtp.gmail.com"
147
smtp_port = 587
148
sender_email = "your-email@gmail.com"
149
sender_password = "your-password"
150
151
try:
152
# Create message
153
message = MIMEMultipart()
154
message["From"] = sender_email
155
message["To"] = to_email
156
message["Subject"] = subject
157
158
message.attach(MIMEText(body, "plain"))
159
160
# Send email
161
with smtplib.SMTP(smtp_server, smtp_port) as server:
162
server.starttls()
163
server.login(sender_email, sender_password)
164
server.send_message(message)
165
166
print(f"Email sent successfully to {to_email}")
167
except Exception as e:
168
print(f"Failed to send email: {str(e)}")
169
170
@app.post("/send-email/")
171
async def send_email_endpoint(
172
to_email: str,
173
subject: str,
174
body: str,
175
background_tasks: BackgroundTasks
176
):
177
background_tasks.add_task(send_email, to_email, subject, body)
178
return {"message": "Email will be sent in the background"}
179
```
180
181
### Background Task with File Processing
182
183
```python
184
import os
185
import csv
186
from typing import List
187
from fastapi import FastAPI, BackgroundTasks, UploadFile, File
188
189
app = FastAPI()
190
191
def process_csv_file(filename: str, user_id: int):
192
try:
193
with open(filename, 'r') as file:
194
csv_reader = csv.DictReader(file)
195
processed_rows = 0
196
197
for row in csv_reader:
198
# Process each row (simulate some work)
199
process_csv_row(row, user_id)
200
processed_rows += 1
201
202
# Clean up temporary file
203
os.remove(filename)
204
205
# Log completion
206
print(f"Processed {processed_rows} rows for user {user_id}")
207
208
# Notify user (in a real app, you might update a database or send a webhook)
209
notify_user_completion(user_id, processed_rows)
210
211
except Exception as e:
212
print(f"Error processing CSV: {str(e)}")
213
notify_user_error(user_id, str(e))
214
215
def process_csv_row(row: dict, user_id: int):
216
# Simulate row processing
217
print(f"Processing row for user {user_id}: {row}")
218
219
def notify_user_completion(user_id: int, row_count: int):
220
# In a real application, this might send a push notification or update a database
221
print(f"Notifying user {user_id}: processed {row_count} rows")
222
223
def notify_user_error(user_id: int, error_message: str):
224
print(f"Notifying user {user_id} of error: {error_message}")
225
226
@app.post("/upload-csv/{user_id}")
227
async def upload_csv(
228
user_id: int,
229
background_tasks: BackgroundTasks,
230
file: UploadFile = File(...)
231
):
232
# Save uploaded file temporarily
233
temp_filename = f"temp_{user_id}_{file.filename}"
234
235
with open(temp_filename, "wb") as temp_file:
236
content = await file.read()
237
temp_file.write(content)
238
239
# Process file in background
240
background_tasks.add_task(process_csv_file, temp_filename, user_id)
241
242
return {"message": f"File {file.filename} uploaded and will be processed in background"}
243
```
244
245
### Background Task with Database Operations
246
247
```python
248
from fastapi import FastAPI, BackgroundTasks
249
from sqlalchemy import create_engine, Column, Integer, String, DateTime
250
from sqlalchemy.ext.declarative import declarative_base
251
from sqlalchemy.orm import sessionmaker
252
from datetime import datetime
253
254
app = FastAPI()
255
256
# Database setup (simplified)
257
Base = declarative_base()
258
engine = create_engine("sqlite:///./test.db")
259
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
260
261
class ActivityLog(Base):
262
__tablename__ = "activity_logs"
263
264
id = Column(Integer, primary_key=True, index=True)
265
user_id = Column(Integer, index=True)
266
action = Column(String)
267
timestamp = Column(DateTime, default=datetime.utcnow)
268
269
Base.metadata.create_all(bind=engine)
270
271
def log_user_activity(user_id: int, action: str):
272
db = SessionLocal()
273
try:
274
activity = ActivityLog(user_id=user_id, action=action)
275
db.add(activity)
276
db.commit()
277
print(f"Logged activity: User {user_id} performed {action}")
278
except Exception as e:
279
print(f"Failed to log activity: {str(e)}")
280
db.rollback()
281
finally:
282
db.close()
283
284
def update_user_statistics(user_id: int):
285
db = SessionLocal()
286
try:
287
# Update user statistics based on recent activity
288
# This is a placeholder for complex statistical calculations
289
print(f"Updated statistics for user {user_id}")
290
except Exception as e:
291
print(f"Failed to update statistics: {str(e)}")
292
finally:
293
db.close()
294
295
@app.post("/user/{user_id}/action")
296
async def perform_user_action(
297
user_id: int,
298
action: str,
299
background_tasks: BackgroundTasks
300
):
301
# Log the activity in background
302
background_tasks.add_task(log_user_activity, user_id, action)
303
304
# Update user statistics in background
305
background_tasks.add_task(update_user_statistics, user_id)
306
307
return {"message": f"Action '{action}' recorded for user {user_id}"}
308
```
309
310
### Async Background Tasks
311
312
```python
313
import asyncio
314
import aiohttp
315
from fastapi import FastAPI, BackgroundTasks
316
317
app = FastAPI()
318
319
async def fetch_external_data(url: str, user_id: int):
320
try:
321
async with aiohttp.ClientSession() as session:
322
async with session.get(url) as response:
323
data = await response.json()
324
325
# Process the fetched data
326
await process_external_data(data, user_id)
327
328
print(f"Successfully processed external data for user {user_id}")
329
except Exception as e:
330
print(f"Failed to fetch external data: {str(e)}")
331
332
async def process_external_data(data: dict, user_id: int):
333
# Simulate async processing
334
await asyncio.sleep(1)
335
print(f"Processed data for user {user_id}: {len(data)} items")
336
337
async def send_webhook(webhook_url: str, payload: dict):
338
try:
339
async with aiohttp.ClientSession() as session:
340
async with session.post(webhook_url, json=payload) as response:
341
if response.status == 200:
342
print("Webhook sent successfully")
343
else:
344
print(f"Webhook failed with status {response.status}")
345
except Exception as e:
346
print(f"Webhook error: {str(e)}")
347
348
@app.post("/trigger-external-fetch/{user_id}")
349
async def trigger_external_fetch(
350
user_id: int,
351
data_url: str,
352
webhook_url: str,
353
background_tasks: BackgroundTasks
354
):
355
# Fetch external data in background
356
background_tasks.add_task(fetch_external_data, data_url, user_id)
357
358
# Send webhook notification in background
359
payload = {"user_id": user_id, "action": "external_fetch_triggered"}
360
background_tasks.add_task(send_webhook, webhook_url, payload)
361
362
return {"message": f"External data fetch triggered for user {user_id}"}
363
```
364
365
### Background Tasks with Error Handling
366
367
```python
368
import logging
369
from fastapi import FastAPI, BackgroundTasks
370
371
# Configure logging
372
logging.basicConfig(level=logging.INFO)
373
logger = logging.getLogger(__name__)
374
375
app = FastAPI()
376
377
def safe_background_task(task_name: str, *args, **kwargs):
378
"""Wrapper for background tasks with error handling"""
379
try:
380
logger.info(f"Starting background task: {task_name}")
381
382
# Determine which task to run based on task_name
383
if task_name == "send_notification":
384
send_notification_task(*args, **kwargs)
385
elif task_name == "process_data":
386
process_data_task(*args, **kwargs)
387
elif task_name == "cleanup":
388
cleanup_task(*args, **kwargs)
389
else:
390
raise ValueError(f"Unknown task: {task_name}")
391
392
logger.info(f"Completed background task: {task_name}")
393
394
except Exception as e:
395
logger.error(f"Background task {task_name} failed: {str(e)}")
396
# In a real app, you might want to retry, alert admins, etc.
397
398
def send_notification_task(user_id: int, message: str):
399
if not user_id:
400
raise ValueError("User ID is required")
401
print(f"Notification sent to user {user_id}: {message}")
402
403
def process_data_task(data_id: int):
404
if data_id <= 0:
405
raise ValueError("Invalid data ID")
406
print(f"Processed data {data_id}")
407
408
def cleanup_task():
409
print("Cleanup completed")
410
411
@app.post("/safe-task/{user_id}")
412
async def create_safe_task(user_id: int, message: str, background_tasks: BackgroundTasks):
413
# Use the safe wrapper for error handling
414
background_tasks.add_task(safe_background_task, "send_notification", user_id, message)
415
background_tasks.add_task(safe_background_task, "cleanup")
416
417
return {"message": "Tasks scheduled with error handling"}
418
```
419
420
### Background Tasks with Progress Tracking
421
422
```python
423
import time
424
from typing import Dict
425
from fastapi import FastAPI, BackgroundTasks
426
427
app = FastAPI()
428
429
# In-memory progress tracking (use Redis or database in production)
430
task_progress: Dict[str, dict] = {}
431
432
def long_running_task(task_id: str, items_count: int):
433
task_progress[task_id] = {
434
"status": "running",
435
"progress": 0,
436
"total": items_count,
437
"message": "Starting task..."
438
}
439
440
try:
441
for i in range(items_count):
442
# Simulate work
443
time.sleep(0.5)
444
445
# Update progress
446
task_progress[task_id].update({
447
"progress": i + 1,
448
"message": f"Processing item {i + 1} of {items_count}"
449
})
450
451
# Task completed
452
task_progress[task_id].update({
453
"status": "completed",
454
"message": "Task completed successfully"
455
})
456
457
except Exception as e:
458
task_progress[task_id].update({
459
"status": "failed",
460
"message": f"Task failed: {str(e)}"
461
})
462
463
@app.post("/start-task/{task_id}")
464
async def start_task(task_id: str, items_count: int, background_tasks: BackgroundTasks):
465
if task_id in task_progress:
466
return {"error": "Task with this ID already exists"}
467
468
background_tasks.add_task(long_running_task, task_id, items_count)
469
470
return {
471
"message": f"Task {task_id} started",
472
"task_id": task_id,
473
"check_progress_url": f"/task-progress/{task_id}"
474
}
475
476
@app.get("/task-progress/{task_id}")
477
async def get_task_progress(task_id: str):
478
if task_id not in task_progress:
479
return {"error": "Task not found"}
480
481
return task_progress[task_id]
482
```
483
484
### Background Tasks with Dependency Injection
485
486
```python
487
from fastapi import FastAPI, BackgroundTasks, Depends
488
489
app = FastAPI()
490
491
class EmailService:
492
def send_email(self, to: str, subject: str, body: str):
493
print(f"Sending email to {to}: {subject}")
494
495
class DatabaseService:
496
def log_activity(self, user_id: int, action: str):
497
print(f"Logging: User {user_id} performed {action}")
498
499
# Dependency providers
500
def get_email_service() -> EmailService:
501
return EmailService()
502
503
def get_database_service() -> DatabaseService:
504
return DatabaseService()
505
506
def notification_task(
507
user_id: int,
508
action: str,
509
email_service: EmailService,
510
db_service: DatabaseService
511
):
512
# Use injected services in background task
513
db_service.log_activity(user_id, action)
514
email_service.send_email(
515
f"user{user_id}@example.com",
516
"Action Performed",
517
f"You performed: {action}"
518
)
519
520
@app.post("/action/{user_id}")
521
async def perform_action(
522
user_id: int,
523
action: str,
524
background_tasks: BackgroundTasks,
525
email_service: EmailService = Depends(get_email_service),
526
db_service: DatabaseService = Depends(get_database_service)
527
):
528
# Pass dependencies to background task
529
background_tasks.add_task(
530
notification_task,
531
user_id,
532
action,
533
email_service,
534
db_service
535
)
536
537
return {"message": f"Action {action} performed for user {user_id}"}
538
```