or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-responses.mdapi-routing.mdbackground-tasks.mdcore-application.mddata-utilities.mddependency-injection.mdexception-handling.mdindex.mdmiddleware.mdrequest-parameters.mdrequest-response.mdsecurity-authentication.mdstatic-templating.mdtesting.mdwebsocket-support.md

background-tasks.mddocs/

0

# Background Tasks

1

2

FastAPI provides built-in support for background tasks that execute after sending the HTTP response to the client. This allows you to perform time-consuming operations like sending emails, processing files, or updating databases without making the client wait for the response.

3

4

## Capabilities

5

6

### Background Tasks Class

7

8

Class for managing and executing background tasks after HTTP response completion.

9

10

```python { .api }

11

class BackgroundTasks:

12

def __init__(self, tasks: List[Task] = None) -> None:

13

"""

14

Background tasks container.

15

16

Parameters:

17

- tasks: Optional list of initial tasks to execute

18

"""

19

self.tasks = tasks or []

20

21

def add_task(

22

self,

23

func: Callable,

24

*args: Any,

25

**kwargs: Any

26

) -> None:

27

"""

28

Add background task to execute after response.

29

30

Parameters:

31

- func: Function to execute in background

32

- args: Positional arguments for the function

33

- kwargs: Keyword arguments for the function

34

"""

35

```

36

37

### Task Execution Interface

38

39

Functions can be added as background tasks with any signature and parameters.

40

41

```python { .api }

42

def background_task_function(*args: Any, **kwargs: Any) -> None:

43

"""

44

Background task function signature.

45

46

Parameters:

47

- args: Positional arguments passed from add_task

48

- kwargs: Keyword arguments passed from add_task

49

50

Note: Background tasks are executed synchronously after response

51

"""

52

53

async def async_background_task_function(*args: Any, **kwargs: Any) -> None:

54

"""

55

Async background task function signature.

56

57

Parameters:

58

- args: Positional arguments passed from add_task

59

- kwargs: Keyword arguments passed from add_task

60

61

Note: Async background tasks are awaited after response

62

"""

63

```

64

65

### Background Tasks Dependency

66

67

Background tasks can be injected as dependencies into route handlers.

68

69

```python { .api }

70

def route_handler(

71

background_tasks: BackgroundTasks,

72

# other parameters...

73

) -> Any:

74

"""

75

Route handler with background tasks dependency.

76

77

Parameters:

78

- background_tasks: BackgroundTasks instance for adding tasks

79

80

The BackgroundTasks instance is automatically provided by FastAPI

81

"""

82

```

83

84

## Usage Examples

85

86

### Basic Background Task

87

88

```python

89

from fastapi import FastAPI, BackgroundTasks

90

91

app = FastAPI()

92

93

def write_notification(email: str, message: str = ""):

94

with open("log.txt", mode="w") as email_file:

95

content = f"notification for {email}: {message}\n"

96

email_file.write(content)

97

98

@app.post("/send-notification/{email}")

99

async def send_notification(email: str, background_tasks: BackgroundTasks):

100

background_tasks.add_task(write_notification, email, message="some notification")

101

return {"message": "Notification sent in the background"}

102

```

103

104

### Multiple Background Tasks

105

106

```python

107

from fastapi import FastAPI, BackgroundTasks

108

import time

109

110

app = FastAPI()

111

112

def slow_task_1(name: str):

113

time.sleep(2)

114

print(f"Task 1 completed for {name}")

115

116

def slow_task_2(name: str):

117

time.sleep(3)

118

print(f"Task 2 completed for {name}")

119

120

def cleanup_task():

121

print("Cleanup completed")

122

123

@app.post("/process/{name}")

124

async def process_data(name: str, background_tasks: BackgroundTasks):

125

# Add multiple background tasks

126

background_tasks.add_task(slow_task_1, name)

127

background_tasks.add_task(slow_task_2, name)

128

background_tasks.add_task(cleanup_task)

129

130

return {"message": f"Processing started for {name}"}

131

```

132

133

### Background Task with Email Sending

134

135

```python

136

from fastapi import FastAPI, BackgroundTasks

137

import smtplib

138

from email.mime.text import MIMEText

139

from email.mime.multipart import MIMEMultipart

140

141

app = FastAPI()

142

143

def send_email(to_email: str, subject: str, body: str):

144

# Email configuration (use environment variables in production)

145

smtp_server = "smtp.gmail.com"

146

smtp_port = 587

147

sender_email = "your-email@gmail.com"

148

sender_password = "your-password"

149

150

try:

151

# Create message

152

message = MIMEMultipart()

153

message["From"] = sender_email

154

message["To"] = to_email

155

message["Subject"] = subject

156

157

message.attach(MIMEText(body, "plain"))

158

159

# Send email

160

with smtplib.SMTP(smtp_server, smtp_port) as server:

161

server.starttls()

162

server.login(sender_email, sender_password)

163

server.send_message(message)

164

165

print(f"Email sent successfully to {to_email}")

166

except Exception as e:

167

print(f"Failed to send email: {str(e)}")

168

169

@app.post("/send-email/")

170

async def send_email_endpoint(

171

to_email: str,

172

subject: str,

173

body: str,

174

background_tasks: BackgroundTasks

175

):

176

background_tasks.add_task(send_email, to_email, subject, body)

177

return {"message": "Email will be sent in the background"}

178

```

179

180

### Background Task with File Processing

181

182

```python

183

import os

184

import csv

185

from typing import List

186

from fastapi import FastAPI, BackgroundTasks, UploadFile, File

187

188

app = FastAPI()

189

190

def process_csv_file(filename: str, user_id: int):

191

try:

192

with open(filename, 'r') as file:

193

csv_reader = csv.DictReader(file)

194

processed_rows = 0

195

196

for row in csv_reader:

197

# Process each row (simulate some work)

198

process_csv_row(row, user_id)

199

processed_rows += 1

200

201

# Clean up temporary file

202

os.remove(filename)

203

204

# Log completion

205

print(f"Processed {processed_rows} rows for user {user_id}")

206

207

# Notify user (in a real app, you might update a database or send a webhook)

208

notify_user_completion(user_id, processed_rows)

209

210

except Exception as e:

211

print(f"Error processing CSV: {str(e)}")

212

notify_user_error(user_id, str(e))

213

214

def process_csv_row(row: dict, user_id: int):

215

# Simulate row processing

216

print(f"Processing row for user {user_id}: {row}")

217

218

def notify_user_completion(user_id: int, row_count: int):

219

# In a real application, this might send a push notification or update a database

220

print(f"Notifying user {user_id}: processed {row_count} rows")

221

222

def notify_user_error(user_id: int, error_message: str):

223

print(f"Notifying user {user_id} of error: {error_message}")

224

225

@app.post("/upload-csv/{user_id}")

226

async def upload_csv(

227

user_id: int,

228

background_tasks: BackgroundTasks,

229

file: UploadFile = File(...)

230

):

231

# Save uploaded file temporarily

232

temp_filename = f"temp_{user_id}_{file.filename}"

233

234

with open(temp_filename, "wb") as temp_file:

235

content = await file.read()

236

temp_file.write(content)

237

238

# Process file in background

239

background_tasks.add_task(process_csv_file, temp_filename, user_id)

240

241

return {"message": f"File {file.filename} uploaded and will be processed in background"}

242

```

243

244

### Background Task with Database Operations

245

246

```python

247

from fastapi import FastAPI, BackgroundTasks

248

from sqlalchemy import create_engine, Column, Integer, String, DateTime

249

from sqlalchemy.ext.declarative import declarative_base

250

from sqlalchemy.orm import sessionmaker

251

from datetime import datetime

252

253

app = FastAPI()

254

255

# Database setup (simplified)

256

Base = declarative_base()

257

engine = create_engine("sqlite:///./test.db")

258

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

259

260

class ActivityLog(Base):

261

__tablename__ = "activity_logs"

262

263

id = Column(Integer, primary_key=True, index=True)

264

user_id = Column(Integer, index=True)

265

action = Column(String)

266

timestamp = Column(DateTime, default=datetime.utcnow)

267

268

Base.metadata.create_all(bind=engine)

269

270

def log_user_activity(user_id: int, action: str):

271

db = SessionLocal()

272

try:

273

activity = ActivityLog(user_id=user_id, action=action)

274

db.add(activity)

275

db.commit()

276

print(f"Logged activity: User {user_id} performed {action}")

277

except Exception as e:

278

print(f"Failed to log activity: {str(e)}")

279

db.rollback()

280

finally:

281

db.close()

282

283

def update_user_statistics(user_id: int):

284

db = SessionLocal()

285

try:

286

# Update user statistics based on recent activity

287

# This is a placeholder for complex statistical calculations

288

print(f"Updated statistics for user {user_id}")

289

except Exception as e:

290

print(f"Failed to update statistics: {str(e)}")

291

finally:

292

db.close()

293

294

@app.post("/user/{user_id}/action")

295

async def perform_user_action(

296

user_id: int,

297

action: str,

298

background_tasks: BackgroundTasks

299

):

300

# Log the activity in background

301

background_tasks.add_task(log_user_activity, user_id, action)

302

303

# Update user statistics in background

304

background_tasks.add_task(update_user_statistics, user_id)

305

306

return {"message": f"Action '{action}' recorded for user {user_id}"}

307

```

308

309

### Async Background Tasks

310

311

```python

312

import asyncio

313

import aiohttp

314

from fastapi import FastAPI, BackgroundTasks

315

316

app = FastAPI()

317

318

async def fetch_external_data(url: str, user_id: int):

319

try:

320

async with aiohttp.ClientSession() as session:

321

async with session.get(url) as response:

322

data = await response.json()

323

324

# Process the fetched data

325

await process_external_data(data, user_id)

326

327

print(f"Successfully processed external data for user {user_id}")

328

except Exception as e:

329

print(f"Failed to fetch external data: {str(e)}")

330

331

async def process_external_data(data: dict, user_id: int):

332

# Simulate async processing

333

await asyncio.sleep(1)

334

print(f"Processed data for user {user_id}: {len(data)} items")

335

336

async def send_webhook(webhook_url: str, payload: dict):

337

try:

338

async with aiohttp.ClientSession() as session:

339

async with session.post(webhook_url, json=payload) as response:

340

if response.status == 200:

341

print("Webhook sent successfully")

342

else:

343

print(f"Webhook failed with status {response.status}")

344

except Exception as e:

345

print(f"Webhook error: {str(e)}")

346

347

@app.post("/trigger-external-fetch/{user_id}")

348

async def trigger_external_fetch(

349

user_id: int,

350

data_url: str,

351

webhook_url: str,

352

background_tasks: BackgroundTasks

353

):

354

# Fetch external data in background

355

background_tasks.add_task(fetch_external_data, data_url, user_id)

356

357

# Send webhook notification in background

358

payload = {"user_id": user_id, "action": "external_fetch_triggered"}

359

background_tasks.add_task(send_webhook, webhook_url, payload)

360

361

return {"message": f"External data fetch triggered for user {user_id}"}

362

```

363

364

### Background Tasks with Error Handling

365

366

```python

367

import logging

368

from fastapi import FastAPI, BackgroundTasks

369

370

# Configure logging

371

logging.basicConfig(level=logging.INFO)

372

logger = logging.getLogger(__name__)

373

374

app = FastAPI()

375

376

def safe_background_task(task_name: str, *args, **kwargs):

377

"""Wrapper for background tasks with error handling"""

378

try:

379

logger.info(f"Starting background task: {task_name}")

380

381

# Determine which task to run based on task_name

382

if task_name == "send_notification":

383

send_notification_task(*args, **kwargs)

384

elif task_name == "process_data":

385

process_data_task(*args, **kwargs)

386

elif task_name == "cleanup":

387

cleanup_task(*args, **kwargs)

388

else:

389

raise ValueError(f"Unknown task: {task_name}")

390

391

logger.info(f"Completed background task: {task_name}")

392

393

except Exception as e:

394

logger.error(f"Background task {task_name} failed: {str(e)}")

395

# In a real app, you might want to retry, alert admins, etc.

396

397

def send_notification_task(user_id: int, message: str):

398

if not user_id:

399

raise ValueError("User ID is required")

400

print(f"Notification sent to user {user_id}: {message}")

401

402

def process_data_task(data_id: int):

403

if data_id <= 0:

404

raise ValueError("Invalid data ID")

405

print(f"Processed data {data_id}")

406

407

def cleanup_task():

408

print("Cleanup completed")

409

410

@app.post("/safe-task/{user_id}")

411

async def create_safe_task(user_id: int, message: str, background_tasks: BackgroundTasks):

412

# Use the safe wrapper for error handling

413

background_tasks.add_task(safe_background_task, "send_notification", user_id, message)

414

background_tasks.add_task(safe_background_task, "cleanup")

415

416

return {"message": "Tasks scheduled with error handling"}

417

```

418

419

### Background Tasks with Progress Tracking

420

421

```python

422

import time

423

from typing import Dict

424

from fastapi import FastAPI, BackgroundTasks

425

426

app = FastAPI()

427

428

# In-memory progress tracking (use Redis or database in production)

429

task_progress: Dict[str, dict] = {}

430

431

def long_running_task(task_id: str, items_count: int):

432

task_progress[task_id] = {

433

"status": "running",

434

"progress": 0,

435

"total": items_count,

436

"message": "Starting task..."

437

}

438

439

try:

440

for i in range(items_count):

441

# Simulate work

442

time.sleep(0.5)

443

444

# Update progress

445

task_progress[task_id].update({

446

"progress": i + 1,

447

"message": f"Processing item {i + 1} of {items_count}"

448

})

449

450

# Task completed

451

task_progress[task_id].update({

452

"status": "completed",

453

"message": "Task completed successfully"

454

})

455

456

except Exception as e:

457

task_progress[task_id].update({

458

"status": "failed",

459

"message": f"Task failed: {str(e)}"

460

})

461

462

@app.post("/start-task/{task_id}")

463

async def start_task(task_id: str, items_count: int, background_tasks: BackgroundTasks):

464

if task_id in task_progress:

465

return {"error": "Task with this ID already exists"}

466

467

background_tasks.add_task(long_running_task, task_id, items_count)

468

469

return {

470

"message": f"Task {task_id} started",

471

"task_id": task_id,

472

"check_progress_url": f"/task-progress/{task_id}"

473

}

474

475

@app.get("/task-progress/{task_id}")

476

async def get_task_progress(task_id: str):

477

if task_id not in task_progress:

478

return {"error": "Task not found"}

479

480

return task_progress[task_id]

481

```

482

483

### Background Tasks with Dependency Injection

484

485

```python

486

from fastapi import FastAPI, BackgroundTasks, Depends

487

488

app = FastAPI()

489

490

class EmailService:

491

def send_email(self, to: str, subject: str, body: str):

492

print(f"Sending email to {to}: {subject}")

493

494

class DatabaseService:

495

def log_activity(self, user_id: int, action: str):

496

print(f"Logging: User {user_id} performed {action}")

497

498

# Dependency providers

499

def get_email_service() -> EmailService:

500

return EmailService()

501

502

def get_database_service() -> DatabaseService:

503

return DatabaseService()

504

505

def notification_task(

506

user_id: int,

507

action: str,

508

email_service: EmailService,

509

db_service: DatabaseService

510

):

511

# Use injected services in background task

512

db_service.log_activity(user_id, action)

513

email_service.send_email(

514

f"user{user_id}@example.com",

515

"Action Performed",

516

f"You performed: {action}"

517

)

518

519

@app.post("/action/{user_id}")

520

async def perform_action(

521

user_id: int,

522

action: str,

523

background_tasks: BackgroundTasks,

524

email_service: EmailService = Depends(get_email_service),

525

db_service: DatabaseService = Depends(get_database_service)

526

):

527

# Pass dependencies to background task

528

background_tasks.add_task(

529

notification_task,

530

user_id,

531

action,

532

email_service,

533

db_service

534

)

535

536

return {"message": f"Action {action} performed for user {user_id}"}

537

```