or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

api-routing.mdbackground-tasks.mdcore-application.mddependency-injection.mdexception-handling.mdfile-handling.mdindex.mdparameter-declaration.mdrequest-response.mdwebsocket-support.md

background-tasks.mddocs/

0

# Background Tasks

1

2

FastAPI's background task system allows execution of functions after the HTTP response is sent to the client. This enables operations like sending emails, logging, file processing, or cleanup tasks without making the client wait for these operations to complete.

3

4

## Capabilities

5

6

### BackgroundTasks Class

7

8

Container for managing multiple background tasks that execute after response completion.

9

10

```python { .api }

11

class BackgroundTasks:

12

"""

13

Container for background tasks to be executed after response is sent.

14

15

Background tasks run in the same process after the HTTP response

16

is delivered to the client, allowing for cleanup, logging, or

17

notification operations without blocking the response.

18

"""

19

20

def add_task(

21

self,

22

func: Callable[..., Any],

23

*args: Any,

24

**kwargs: Any

25

) -> None:

26

"""

27

Add a function to be executed as a background task.

28

29

Parameters:

30

- func: Function to execute (can be sync or async)

31

- args: Positional arguments to pass to the function

32

- kwargs: Keyword arguments to pass to the function

33

34

Behaviors:

35

- Tasks execute in the order they were added

36

- Async functions are awaited, sync functions run in thread pool

37

- Exceptions in background tasks are logged but don't affect response

38

- Tasks run after response is sent but before connection closes

39

- Multiple tasks can be added and will execute sequentially

40

"""

41

```

42

43

### Background Task Execution

44

45

Background tasks support both synchronous and asynchronous functions with automatic handling.

46

47

```python { .api }

48

# Sync background task function

49

def sync_background_task(param1: str, param2: int):

50

"""

51

Synchronous background task function.

52

53

Parameters:

54

- param1, param2: Function-specific parameters

55

56

Note: Sync functions run in thread pool to avoid blocking

57

"""

58

59

# Async background task function

60

async def async_background_task(param1: str, param2: int):

61

"""

62

Asynchronous background task function.

63

64

Parameters:

65

- param1, param2: Function-specific parameters

66

67

Note: Async functions are awaited directly

68

"""

69

```

70

71

## Usage Examples

72

73

### Basic Background Tasks

74

75

```python

76

from fastapi import FastAPI, BackgroundTasks

77

import logging

78

79

app = FastAPI()

80

81

# Set up logging

82

logging.basicConfig(level=logging.INFO)

83

logger = logging.getLogger(__name__)

84

85

def write_log(message: str, level: str = "info"):

86

"""Simple logging background task."""

87

if level == "info":

88

logger.info(message)

89

elif level == "error":

90

logger.error(message)

91

elif level == "warning":

92

logger.warning(message)

93

94

def send_notification(user_email: str, message: str):

95

"""Simulate sending email notification."""

96

print(f"Sending email to {user_email}: {message}")

97

# In real application, this would use an email service

98

# time.sleep(2) # Simulate email sending delay

99

100

@app.post("/items/")

101

async def create_item(item_data: dict, background_tasks: BackgroundTasks):

102

# Process the item creation

103

item_id = save_item(item_data) # Your item saving logic

104

105

# Add background tasks

106

background_tasks.add_task(

107

write_log,

108

f"Item {item_id} created successfully",

109

"info"

110

)

111

112

background_tasks.add_task(

113

send_notification,

114

"admin@example.com",

115

f"New item created: {item_data.get('name', 'Unknown')}"

116

)

117

118

# Response is sent immediately, tasks run after

119

return {"item_id": item_id, "status": "created"}

120

121

@app.delete("/items/{item_id}")

122

async def delete_item(item_id: int, background_tasks: BackgroundTasks):

123

# Delete the item

124

item_name = delete_item_from_db(item_id) # Your deletion logic

125

126

# Log deletion in background

127

background_tasks.add_task(

128

write_log,

129

f"Item {item_id} ({item_name}) deleted",

130

"info"

131

)

132

133

return {"message": "Item deleted"}

134

```

135

136

### Async Background Tasks

137

138

```python

139

from fastapi import FastAPI, BackgroundTasks

140

import asyncio

141

import aiohttp

142

import aiofiles

143

144

app = FastAPI()

145

146

async def async_log_to_file(message: str, filename: str = "app.log"):

147

"""Async background task for file logging."""

148

async with aiofiles.open(filename, mode="a") as file:

149

timestamp = datetime.now().isoformat()

150

await file.write(f"{timestamp}: {message}\n")

151

152

async def send_webhook(webhook_url: str, data: dict):

153

"""Async background task for sending webhook."""

154

async with aiohttp.ClientSession() as session:

155

try:

156

async with session.post(webhook_url, json=data) as response:

157

if response.status == 200:

158

print(f"Webhook sent successfully to {webhook_url}")

159

else:

160

print(f"Webhook failed: {response.status}")

161

except Exception as e:

162

print(f"Webhook error: {e}")

163

164

async def process_image_async(image_path: str, user_id: int):

165

"""Async background task for image processing."""

166

print(f"Starting image processing for user {user_id}")

167

168

# Simulate async image processing

169

await asyncio.sleep(2)

170

171

# Update database with processing result

172

await update_user_image_status(user_id, "processed")

173

print(f"Image processing completed for user {user_id}")

174

175

@app.post("/upload-image/")

176

async def upload_image(

177

image_data: dict,

178

user_id: int,

179

background_tasks: BackgroundTasks

180

):

181

# Save image immediately

182

image_path = save_image(image_data) # Your image saving logic

183

184

# Add async background tasks

185

background_tasks.add_task(

186

async_log_to_file,

187

f"Image uploaded by user {user_id}: {image_path}"

188

)

189

190

background_tasks.add_task(

191

process_image_async,

192

image_path,

193

user_id

194

)

195

196

background_tasks.add_task(

197

send_webhook,

198

"https://external-service.com/webhook",

199

{"event": "image_uploaded", "user_id": user_id, "image_path": image_path}

200

)

201

202

return {"message": "Image uploaded", "status": "processing"}

203

```

204

205

### Error Handling in Background Tasks

206

207

```python

208

from fastapi import FastAPI, BackgroundTasks

209

import logging

210

211

app = FastAPI()

212

logger = logging.getLogger(__name__)

213

214

def risky_background_task(data: dict):

215

"""Background task that might fail."""

216

try:

217

# Simulate some risky operation

218

if data.get("cause_error"):

219

raise ValueError("Simulated error in background task")

220

221

print(f"Processing data: {data}")

222

# Actual processing logic here

223

224

except Exception as e:

225

# Log the error (errors in background tasks don't affect response)

226

logger.error(f"Background task failed: {e}")

227

228

# Could also send error notification, update database, etc.

229

send_error_notification(str(e))

230

231

def send_error_notification(error_message: str):

232

"""Send notification about background task error."""

233

print(f"ERROR NOTIFICATION: {error_message}")

234

235

def cleanup_on_error(resource_id: str):

236

"""Cleanup resources if main task fails."""

237

print(f"Cleaning up resource: {resource_id}")

238

239

@app.post("/process-data/")

240

async def process_data(data: dict, background_tasks: BackgroundTasks):

241

resource_id = create_resource() # Your resource creation logic

242

243

try:

244

# Main processing

245

result = process_main_data(data) # Your main logic

246

247

# Add successful processing background task

248

background_tasks.add_task(risky_background_task, data)

249

250

return {"result": result, "resource_id": resource_id}

251

252

except Exception as e:

253

# If main processing fails, add cleanup background task

254

background_tasks.add_task(cleanup_on_error, resource_id)

255

raise # Re-raise to return error response

256

```

257

258

### Complex Background Task Workflows

259

260

```python

261

from fastapi import FastAPI, BackgroundTasks

262

from typing import List

263

import json

264

265

app = FastAPI()

266

267

def step1_validate_data(order_id: int, data: dict):

268

"""First step: validate order data."""

269

print(f"Step 1: Validating order {order_id}")

270

# Validation logic here

271

272

if not data.get("customer_id"):

273

raise ValueError("Customer ID is required")

274

275

print(f"Order {order_id} validation completed")

276

277

def step2_update_inventory(order_id: int, items: List[dict]):

278

"""Second step: update inventory."""

279

print(f"Step 2: Updating inventory for order {order_id}")

280

281

for item in items:

282

# Update inventory logic

283

print(f"Updated inventory for item {item['id']}: -{item['quantity']}")

284

285

print(f"Inventory update completed for order {order_id}")

286

287

def step3_send_confirmation(order_id: int, customer_email: str):

288

"""Third step: send confirmation email."""

289

print(f"Step 3: Sending confirmation for order {order_id} to {customer_email}")

290

291

# Email sending logic here

292

print(f"Confirmation email sent for order {order_id}")

293

294

def step4_notify_fulfillment(order_id: int, items: List[dict]):

295

"""Fourth step: notify fulfillment center."""

296

print(f"Step 4: Notifying fulfillment center for order {order_id}")

297

298

fulfillment_data = {

299

"order_id": order_id,

300

"items": items,

301

"priority": "standard"

302

}

303

304

# Send to fulfillment system

305

print(f"Fulfillment notification sent: {json.dumps(fulfillment_data)}")

306

307

@app.post("/orders/")

308

async def create_order(order_data: dict, background_tasks: BackgroundTasks):

309

# Create order immediately

310

order_id = create_order_record(order_data) # Your order creation logic

311

312

# Add sequential background tasks

313

# These will execute in the order they were added

314

background_tasks.add_task(

315

step1_validate_data,

316

order_id,

317

order_data

318

)

319

320

background_tasks.add_task(

321

step2_update_inventory,

322

order_id,

323

order_data.get("items", [])

324

)

325

326

background_tasks.add_task(

327

step3_send_confirmation,

328

order_id,

329

order_data.get("customer_email")

330

)

331

332

background_tasks.add_task(

333

step4_notify_fulfillment,

334

order_id,

335

order_data.get("items", [])

336

)

337

338

return {

339

"order_id": order_id,

340

"status": "created",

341

"message": "Order created, processing in background"

342

}

343

```

344

345

### Background Tasks with External Services

346

347

```python

348

from fastapi import FastAPI, BackgroundTasks

349

import requests

350

import time

351

from datetime import datetime

352

353

app = FastAPI()

354

355

def sync_with_external_service(user_id: int, user_data: dict):

356

"""Sync user data with external CRM system."""

357

crm_endpoint = "https://crm.example.com/api/users"

358

359

try:

360

response = requests.post(

361

f"{crm_endpoint}/{user_id}",

362

json=user_data,

363

timeout=30

364

)

365

366

if response.status_code == 200:

367

print(f"User {user_id} synced with CRM successfully")

368

log_sync_success(user_id)

369

else:

370

print(f"CRM sync failed for user {user_id}: {response.status_code}")

371

log_sync_failure(user_id, response.status_code)

372

373

except requests.RequestException as e:

374

print(f"CRM sync error for user {user_id}: {e}")

375

log_sync_failure(user_id, str(e))

376

377

def generate_report(report_type: str, user_id: int, filters: dict):

378

"""Generate report in background."""

379

print(f"Starting {report_type} report generation for user {user_id}")

380

381

# Simulate report generation

382

time.sleep(5) # This would be actual report processing

383

384

report_data = {

385

"type": report_type,

386

"user_id": user_id,

387

"filters": filters,

388

"generated_at": datetime.now().isoformat(),

389

"status": "completed"

390

}

391

392

# Save report to storage

393

report_id = save_report(report_data) # Your storage logic

394

395

# Notify user that report is ready

396

notify_report_ready(user_id, report_id)

397

398

print(f"Report {report_id} generated successfully")

399

400

def cleanup_temp_files(file_paths: List[str]):

401

"""Clean up temporary files."""

402

for file_path in file_paths:

403

try:

404

os.remove(file_path)

405

print(f"Removed temp file: {file_path}")

406

except OSError as e:

407

print(f"Failed to remove {file_path}: {e}")

408

409

@app.post("/users/{user_id}")

410

async def update_user(

411

user_id: int,

412

user_data: dict,

413

background_tasks: BackgroundTasks

414

):

415

# Update user in main database

416

updated_user = update_user_record(user_id, user_data) # Your update logic

417

418

# Sync with external services in background

419

background_tasks.add_task(

420

sync_with_external_service,

421

user_id,

422

user_data

423

)

424

425

return {"user": updated_user, "status": "updated"}

426

427

@app.post("/reports/generate")

428

async def request_report(

429

report_request: dict,

430

current_user_id: int,

431

background_tasks: BackgroundTasks

432

):

433

# Validate request immediately

434

if not report_request.get("type"):

435

raise HTTPException(status_code=400, detail="Report type is required")

436

437

# Start report generation in background

438

background_tasks.add_task(

439

generate_report,

440

report_request["type"],

441

current_user_id,

442

report_request.get("filters", {})

443

)

444

445

return {

446

"message": "Report generation started",

447

"status": "processing",

448

"estimated_time": "5-10 minutes"

449

}

450

451

@app.post("/files/process")

452

async def process_files(

453

file_data: dict,

454

background_tasks: BackgroundTasks

455

):

456

# Process files and create temp files

457

temp_files = create_temp_files(file_data) # Your file processing

458

processed_data = process_file_data(temp_files) # Your processing logic

459

460

# Clean up temp files in background

461

background_tasks.add_task(cleanup_temp_files, temp_files)

462

463

return {"processed_data": processed_data, "temp_files_count": len(temp_files)}

464

```

465

466

### Background Tasks with Database Operations

467

468

```python

469

from fastapi import FastAPI, BackgroundTasks, Depends

470

from sqlalchemy.orm import Session

471

472

app = FastAPI()

473

474

def update_user_stats(user_id: int, action: str, db_session: Session):

475

"""Update user statistics in background."""

476

try:

477

# Update user statistics

478

stats = db_session.query(UserStats).filter(UserStats.user_id == user_id).first()

479

480

if not stats:

481

stats = UserStats(user_id=user_id)

482

db_session.add(stats)

483

484

if action == "login":

485

stats.login_count += 1

486

stats.last_login = datetime.now()

487

elif action == "purchase":

488

stats.purchase_count += 1

489

stats.last_purchase = datetime.now()

490

491

db_session.commit()

492

print(f"Updated stats for user {user_id}: {action}")

493

494

except Exception as e:

495

db_session.rollback()

496

print(f"Failed to update stats for user {user_id}: {e}")

497

498

finally:

499

db_session.close()

500

501

def log_audit_event(user_id: int, event_type: str, details: dict, db_session: Session):

502

"""Log audit events in background."""

503

try:

504

audit_log = AuditLog(

505

user_id=user_id,

506

event_type=event_type,

507

details=json.dumps(details),

508

timestamp=datetime.now()

509

)

510

511

db_session.add(audit_log)

512

db_session.commit()

513

print(f"Logged audit event: {event_type} for user {user_id}")

514

515

except Exception as e:

516

db_session.rollback()

517

print(f"Failed to log audit event: {e}")

518

519

finally:

520

db_session.close()

521

522

@app.post("/login")

523

async def login(

524

credentials: dict,

525

background_tasks: BackgroundTasks,

526

db: Session = Depends(get_db)

527

):

528

# Authenticate user

529

user = authenticate_user(credentials) # Your auth logic

530

531

if not user:

532

raise HTTPException(status_code=401, detail="Invalid credentials")

533

534

# Create new database session for background task

535

# (the original session will be closed when request ends)

536

background_db = SessionLocal()

537

538

# Update login statistics in background

539

background_tasks.add_task(

540

update_user_stats,

541

user.id,

542

"login",

543

background_db

544

)

545

546

# Log login event in background

547

background_tasks.add_task(

548

log_audit_event,

549

user.id,

550

"login",

551

{"ip_address": request.client.host, "user_agent": request.headers.get("user-agent")},

552

SessionLocal() # Another new session

553

)

554

555

return {"access_token": create_access_token(user.id)}

556

```

557

558

### Testing Background Tasks

559

560

```python

561

from fastapi import FastAPI, BackgroundTasks

562

from fastapi.testclient import TestClient

563

import pytest

564

from unittest.mock import patch, Mock

565

566

app = FastAPI()

567

568

# Mock functions for testing

569

sent_emails = []

570

logged_messages = []

571

572

def mock_send_email(recipient: str, subject: str, body: str):

573

"""Mock email function for testing."""

574

sent_emails.append({

575

"recipient": recipient,

576

"subject": subject,

577

"body": body

578

})

579

580

def mock_log_message(message: str):

581

"""Mock logging function for testing."""

582

logged_messages.append(message)

583

584

@app.post("/test-endpoint")

585

async def test_endpoint(data: dict, background_tasks: BackgroundTasks):

586

background_tasks.add_task(

587

mock_send_email,

588

"test@example.com",

589

"Test Subject",

590

f"Test body: {data.get('message', '')}"

591

)

592

593

background_tasks.add_task(

594

mock_log_message,

595

f"Processed data: {data}"

596

)

597

598

return {"status": "success"}

599

600

def test_background_tasks():

601

client = TestClient(app)

602

603

# Clear mock data

604

sent_emails.clear()

605

logged_messages.clear()

606

607

# Make request

608

response = client.post("/test-endpoint", json={"message": "test"})

609

610

# Check response

611

assert response.status_code == 200

612

assert response.json() == {"status": "success"}

613

614

# Check background tasks were executed

615

# Note: In TestClient, background tasks run synchronously

616

assert len(sent_emails) == 1

617

assert sent_emails[0]["recipient"] == "test@example.com"

618

assert sent_emails[0]["subject"] == "Test Subject"

619

assert "test" in sent_emails[0]["body"]

620

621

assert len(logged_messages) == 1

622

assert "test" in logged_messages[0]

623

624

@patch('your_module.actual_send_email')

625

def test_background_tasks_with_mocks(mock_send_email):

626

"""Test background tasks with proper mocking."""

627

client = TestClient(app)

628

629

response = client.post("/test-endpoint", json={"message": "test"})

630

631

assert response.status_code == 200

632

633

# Verify the mock was called

634

mock_send_email.assert_called_once_with(

635

"test@example.com",

636

"Test Subject",

637

"Test body: test"

638

)

639

```