or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdcore-application.mdexceptions.mdindex.mdresults-state.mdscheduling-beat.mdsignals-events.mdworkflow-primitives.md

exceptions.mddocs/

0

# Exception Handling

1

2

Complete exception hierarchy for Celery task errors, retry mechanisms, timeout handling, backend errors, and worker-related exceptions. These exceptions provide fine-grained error handling and control flow for robust distributed task processing.

3

4

## Capabilities

5

6

### Core Exceptions

7

8

Base exceptions and fundamental error types that form the foundation of Celery's error handling system.

9

10

```python { .api }

11

class CeleryError(Exception):

12

"""

13

Base exception for all Celery errors.

14

"""

15

16

class ImproperlyConfigured(CeleryError):

17

"""

18

Raised when Celery is improperly configured.

19

20

Common causes:

21

- Missing broker configuration

22

- Invalid serializer settings

23

- Incorrect backend configuration

24

"""

25

26

class SecurityError(CeleryError):

27

"""

28

Raised when security-related operations fail.

29

30

Common causes:

31

- Invalid message signatures

32

- Authentication failures

33

- SSL/TLS errors

34

"""

35

36

class OperationalError(Exception):

37

"""

38

Raised when transport connection error occurs while sending messages.

39

40

Note:

41

This exception does not inherit from CeleryError as it comes

42

from the kombu messaging library.

43

44

Common causes:

45

- Broker connection failures

46

- Network timeouts

47

- Authentication errors with message broker

48

"""

49

```

50

51

### Task Execution Exceptions

52

53

Exceptions related to task execution, retry logic, and task lifecycle management.

54

55

```python { .api }

56

class TaskError(CeleryError):

57

"""Base exception for task-related errors."""

58

59

class TaskPredicate(CeleryError):

60

"""

61

Base class for task predicate exceptions.

62

63

These exceptions control task execution flow rather than

64

indicating actual errors.

65

"""

66

67

class Retry(TaskPredicate):

68

"""

69

Exception raised to trigger task retry.

70

71

Attributes:

72

message (str): Retry reason message

73

exc (Exception): Original exception that caused retry

74

when (datetime): When to retry (eta)

75

"""

76

77

def __init__(self, message=None, exc=None, when=None, **kwargs):

78

"""

79

Create retry exception.

80

81

Args:

82

message (str): Retry message

83

exc (Exception): Causing exception

84

when (datetime): Retry time

85

**kwargs: Additional retry options

86

"""

87

88

class Ignore(TaskPredicate):

89

"""

90

Exception raised to ignore task result.

91

92

When raised, the task state will not be updated and no result

93

will be stored in the result backend.

94

"""

95

96

class Reject(TaskPredicate):

97

"""

98

Exception raised to reject and requeue task.

99

100

Args:

101

reason (str): Rejection reason

102

requeue (bool): Whether to requeue the task

103

"""

104

105

def __init__(self, reason=None, requeue=False):

106

"""

107

Create reject exception.

108

109

Args:

110

reason (str): Reason for rejection

111

requeue (bool): Requeue the task

112

"""

113

114

class NotRegistered(TaskError):

115

"""

116

Raised when attempting to execute unregistered task.

117

118

Args:

119

task_name (str): Name of unregistered task

120

"""

121

122

class AlreadyRegistered(TaskError):

123

"""

124

Raised when attempting to register already registered task.

125

126

Args:

127

task_name (str): Name of already registered task

128

"""

129

130

class MaxRetriesExceededError(TaskError):

131

"""

132

Raised when task exceeds maximum retry attempts.

133

134

Attributes:

135

task_args (tuple): Task arguments

136

task_kwargs (dict): Task keyword arguments

137

task_name (str): Task name

138

task_id (str): Task ID

139

"""

140

141

class TaskRevokedError(TaskError):

142

"""

143

Raised when trying to execute revoked task.

144

145

Args:

146

message (str): Revocation reason

147

task_id (str): Revoked task ID

148

"""

149

150

class InvalidTaskError(TaskError):

151

"""

152

Raised when task data is invalid or corrupted.

153

154

Common causes:

155

- Malformed message body

156

- Missing required task attributes

157

- Invalid task signature

158

"""

159

160

class ChordError(TaskError):

161

"""

162

Raised when chord callback fails or chord is misconfigured.

163

164

Args:

165

message (str): Error description

166

callback (str): Callback task name

167

"""

168

169

class QueueNotFound(TaskError):

170

"""

171

Raised when task is routed to a queue not in conf.queues.

172

173

Args:

174

queue_name (str): Name of the missing queue

175

"""

176

177

class IncompleteStream(TaskError):

178

"""

179

Raised when end of data stream is found but data isn't complete.

180

181

Common causes:

182

- Network interruption during message transfer

183

- Corrupted message payload

184

- Premature connection termination

185

"""

186

```

187

188

### Timeout Exceptions

189

190

Time-related exceptions for handling task execution limits and timeouts.

191

192

```python { .api }

193

class TimeoutError(CeleryError):

194

"""

195

Raised when operation times out.

196

197

Args:

198

operation (str): Operation that timed out

199

timeout (float): Timeout value in seconds

200

"""

201

202

class SoftTimeLimitExceeded(Exception):

203

"""

204

Raised when task exceeds soft time limit.

205

206

This exception allows tasks to perform cleanup before

207

hard termination occurs.

208

"""

209

210

class TimeLimitExceeded(Exception):

211

"""

212

Raised when task exceeds hard time limit.

213

214

This exception indicates immediate task termination.

215

"""

216

```

217

218

### Backend Exceptions

219

220

Exceptions related to result backend operations and storage systems.

221

222

```python { .api }

223

class BackendError(CeleryError):

224

"""

225

Base exception for result backend errors.

226

"""

227

228

class BackendGetMetaError(BackendError):

229

"""

230

Raised when backend fails to retrieve task metadata.

231

232

Args:

233

task_id (str): Task ID that failed to retrieve

234

backend_type (str): Type of backend

235

"""

236

237

class BackendStoreError(BackendError):

238

"""

239

Raised when backend fails to store task result.

240

241

Args:

242

task_id (str): Task ID that failed to store

243

result: Result that failed to store

244

backend_type (str): Type of backend

245

"""

246

```

247

248

### Worker Exceptions

249

250

Exceptions related to worker processes, including termination and process management.

251

252

```python { .api }

253

class WorkerLostError(Exception):

254

"""

255

Raised when worker process is lost unexpectedly.

256

257

Args:

258

message (str): Loss description

259

exitcode (int): Worker exit code

260

"""

261

262

class Terminated(Exception):

263

"""

264

Raised when worker process is terminated.

265

266

Args:

267

signum (int): Signal number that caused termination

268

reason (str): Termination reason

269

"""

270

271

class WorkerShutdown(SystemExit):

272

"""

273

Exception raised to signal worker shutdown.

274

275

Args:

276

msg (str): Shutdown message

277

exitcode (int): Exit code

278

"""

279

280

class WorkerTerminate(SystemExit):

281

"""

282

Exception raised to signal immediate worker termination.

283

284

Args:

285

msg (str): Termination message

286

exitcode (int): Exit code

287

"""

288

```

289

290

### Warning Classes

291

292

Warning categories for non-fatal issues that should be brought to user attention.

293

294

```python { .api }

295

class CeleryWarning(UserWarning):

296

"""Base warning class for Celery."""

297

298

class AlwaysEagerIgnored(CeleryWarning):

299

"""

300

Warning raised when task_always_eager is ignored.

301

302

Occurs when eager execution is requested but not supported

303

in the current context.

304

"""

305

306

class DuplicateNodenameWarning(CeleryWarning):

307

"""

308

Warning raised when duplicate worker node names detected.

309

310

Can cause issues with worker management and monitoring.

311

"""

312

313

class FixupWarning(CeleryWarning):

314

"""

315

Warning raised during fixup operations.

316

317

Indicates potential compatibility or configuration issues.

318

"""

319

320

class NotConfigured(CeleryWarning):

321

"""

322

Warning raised when required configuration is missing.

323

324

Indicates that default values are being used where

325

explicit configuration would be preferred.

326

"""

327

```

328

329

## Usage Examples

330

331

### Basic Exception Handling

332

333

```python

334

from celery import Celery

335

from celery.exceptions import (

336

Retry, Ignore, Reject, MaxRetriesExceededError,

337

SoftTimeLimitExceeded, TimeLimitExceeded

338

)

339

340

app = Celery('exception_example')

341

342

@app.task(bind=True, max_retries=3)

343

def unreliable_task(self, data):

344

"""Task with comprehensive error handling."""

345

346

try:

347

# Simulate unreliable operation

348

if random.random() < 0.3:

349

raise ConnectionError("Network is down")

350

351

# Process data

352

result = process_data(data)

353

return result

354

355

except SoftTimeLimitExceeded:

356

# Cleanup before hard termination

357

cleanup_resources()

358

raise

359

360

except ConnectionError as exc:

361

# Retry on network errors with exponential backoff

362

countdown = 2 ** self.request.retries

363

raise self.retry(countdown=countdown, exc=exc, max_retries=5)

364

365

except ValueError as exc:

366

# Don't retry on data validation errors

367

raise Ignore(f"Invalid data: {exc}")

368

369

except Exception as exc:

370

# Log unexpected errors and retry

371

logger.error(f"Unexpected error in task {self.request.id}: {exc}")

372

raise self.retry(countdown=60, exc=exc)

373

374

def process_data(data):

375

"""Simulate data processing."""

376

if not data:

377

raise ValueError("Empty data")

378

return f"processed_{data}"

379

380

def cleanup_resources():

381

"""Cleanup before task termination."""

382

print("Cleaning up resources...")

383

```

384

385

### Retry Logic with Custom Exceptions

386

387

```python

388

from celery.exceptions import Retry

389

import requests

390

from requests.exceptions import RequestException

391

392

@app.task(bind=True, max_retries=5)

393

def api_call_task(self, url, data):

394

"""Task that handles API failures with smart retry logic."""

395

396

try:

397

response = requests.post(url, json=data, timeout=30)

398

response.raise_for_status()

399

return response.json()

400

401

except requests.exceptions.Timeout:

402

# Retry timeouts with longer delay

403

raise self.retry(countdown=30, exc=exc)

404

405

except requests.exceptions.ConnectionError as exc:

406

# Retry connection errors with exponential backoff

407

countdown = min(2 ** self.request.retries, 300) # Max 5 minutes

408

raise self.retry(countdown=countdown, exc=exc)

409

410

except requests.exceptions.HTTPError as exc:

411

# Don't retry client errors (4xx), do retry server errors (5xx)

412

if 400 <= exc.response.status_code < 500:

413

raise Ignore(f"Client error {exc.response.status_code}: {exc}")

414

else:

415

raise self.retry(countdown=60, exc=exc)

416

417

except MaxRetriesExceededError:

418

# Log final failure and send alert

419

logger.error(f"API call to {url} failed after all retries")

420

send_failure_alert.delay(url, str(exc))

421

raise

422

423

@app.task

424

def send_failure_alert(url, error):

425

"""Send alert for permanent failures."""

426

# Send notification to ops team

427

pass

428

```

429

430

### Task Rejection and Requeuing

431

432

```python

433

from celery.exceptions import Reject

434

import psutil

435

436

@app.task(bind=True)

437

def memory_intensive_task(self, large_data):

438

"""Task that rejects itself if system memory is low."""

439

440

# Check available memory

441

memory = psutil.virtual_memory()

442

if memory.percent > 85: # More than 85% memory used

443

logger.warning("System memory high, rejecting task for requeue")

444

raise Reject("High memory usage", requeue=True)

445

446

try:

447

# Memory intensive processing

448

result = process_large_data(large_data)

449

return result

450

451

except MemoryError:

452

# Reject and requeue if we run out of memory

453

logger.error("Task ran out of memory, requeuing")

454

raise Reject("Out of memory", requeue=True)

455

456

def process_large_data(data):

457

"""Simulate memory intensive processing."""

458

return f"processed_{len(data)}_items"

459

```

460

461

### Time Limit Handling

462

463

```python

464

from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded

465

import signal

466

467

@app.task(bind=True, soft_time_limit=300, time_limit=320) # 5min soft, 5min 20s hard

468

def long_running_task(self, items):

469

"""Task with graceful timeout handling."""

470

471

processed_items = []

472

473

try:

474

for i, item in enumerate(items):

475

# Check for soft time limit periodically

476

if i % 100 == 0 and self.request.timelimit:

477

remaining = self.request.timelimit[0] - time.time()

478

if remaining < 30: # Less than 30 seconds left

479

logger.warning("Approaching soft time limit, saving progress")

480

save_partial_results.delay(processed_items)

481

482

result = process_item(item)

483

processed_items.append(result)

484

485

return processed_items

486

487

except SoftTimeLimitExceeded:

488

# Graceful cleanup on soft limit

489

logger.info("Soft time limit exceeded, saving partial results")

490

save_partial_results.delay(processed_items)

491

492

# Continue processing remaining items in new task

493

remaining_items = items[len(processed_items):]

494

if remaining_items:

495

long_running_task.apply_async(args=[remaining_items], countdown=5)

496

497

return f"Partial completion: {len(processed_items)} items processed"

498

499

@app.task

500

def save_partial_results(results):

501

"""Save partial results for recovery."""

502

# Save to database or file

503

logger.info(f"Saved {len(results)} partial results")

504

505

def process_item(item):

506

"""Process individual item."""

507

import time

508

time.sleep(0.1) # Simulate processing time

509

return f"processed_{item}"

510

```

511

512

### Backend Error Handling

513

514

```python

515

from celery.exceptions import BackendError, BackendStoreError

516

from celery import Celery

517

518

@app.task(bind=True, ignore_result=False)

519

def critical_task(self, important_data):

520

"""Task with explicit backend error handling."""

521

522

try:

523

# Critical processing

524

result = perform_critical_operation(important_data)

525

526

# Try to store result explicitly

527

try:

528

self.update_state(state='SUCCESS', meta={'result': result})

529

except BackendStoreError as exc:

530

# Backend failed, store locally as fallback

531

logger.error(f"Failed to store result in backend: {exc}")

532

store_result_locally(self.request.id, result)

533

534

return result

535

536

except Exception as exc:

537

# Ensure error is logged even if backend fails

538

try:

539

self.update_state(

540

state='FAILURE',

541

meta={'error': str(exc), 'traceback': traceback.format_exc()}

542

)

543

except BackendError:

544

# Backend completely unavailable

545

logger.critical(f"Backend unavailable, task {self.request.id} result lost")

546

store_result_locally(self.request.id, {'error': str(exc)})

547

548

raise

549

550

def perform_critical_operation(data):

551

"""Simulate critical operation."""

552

return f"critical_result_{data}"

553

554

def store_result_locally(task_id, result):

555

"""Store result locally when backend fails."""

556

# Store in local file, database, etc.

557

with open(f'/tmp/celery_results/{task_id}.json', 'w') as f:

558

json.dump({'task_id': task_id, 'result': result}, f)

559

```

560

561

### Worker Process Management

562

563

```python

564

from celery.exceptions import WorkerLostError, Terminated

565

from celery.signals import worker_process_shutdown, task_failure

566

567

@worker_process_shutdown.connect

568

def handle_worker_shutdown(sender=None, pid=None, exitcode=None, **kwargs):

569

"""Handle worker process shutdown."""

570

571

if exitcode != 0:

572

logger.error(f"Worker process {pid} died unexpectedly with exit code {exitcode}")

573

574

# Notify monitoring system

575

notify_ops_team.delay(f"Worker {pid} crashed with exit code {exitcode}")

576

577

@task_failure.connect

578

def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):

579

"""Handle task failures, including worker-related ones."""

580

581

if isinstance(exception, WorkerLostError):

582

logger.error(f"Task {task_id} failed due to worker loss")

583

584

# Requeue critical tasks

585

if sender.name in ['critical_task', 'payment_processing']:

586

requeue_critical_task.delay(task_id, sender.name)

587

588

elif isinstance(exception, Terminated):

589

logger.warning(f"Task {task_id} was terminated")

590

591

@app.task

592

def notify_ops_team(message):

593

"""Send notification to operations team."""

594

# Send Slack message, email, etc.

595

pass

596

597

@app.task

598

def requeue_critical_task(task_id, task_name):

599

"""Requeue critical tasks that failed due to worker issues."""

600

# Logic to requeue the task

601

pass

602

```

603

604

### Custom Exception Classes

605

606

```python

607

from celery.exceptions import TaskError, CeleryError

608

609

class DataValidationError(TaskError):

610

"""Custom exception for data validation failures."""

611

612

def __init__(self, field, value, message=None):

613

self.field = field

614

self.value = value

615

self.message = message or f"Invalid {field}: {value}"

616

super().__init__(self.message)

617

618

class ExternalServiceError(CeleryError):

619

"""Custom exception for external service failures."""

620

621

def __init__(self, service_name, error_code, message=None):

622

self.service_name = service_name

623

self.error_code = error_code

624

self.message = message or f"{service_name} error {error_code}"

625

super().__init__(self.message)

626

627

@app.task(bind=True, max_retries=3)

628

def validate_and_process(self, user_data):

629

"""Task using custom exceptions."""

630

631

try:

632

# Validate data

633

if not user_data.get('email'):

634

raise DataValidationError('email', user_data.get('email'), 'Email is required')

635

636

if '@' not in user_data['email']:

637

raise DataValidationError('email', user_data['email'], 'Invalid email format')

638

639

# Call external service

640

response = call_external_api(user_data)

641

642

if response.status_code != 200:

643

raise ExternalServiceError('UserAPI', response.status_code, response.text)

644

645

return response.json()

646

647

except DataValidationError:

648

# Don't retry validation errors

649

raise Ignore(f"Data validation failed: {exc}")

650

651

except ExternalServiceError as exc:

652

if exc.error_code >= 500:

653

# Retry server errors

654

raise self.retry(countdown=30, exc=exc)

655

else:

656

# Don't retry client errors

657

raise Ignore(f"External service error: {exc}")

658

659

def call_external_api(data):

660

"""Simulate external API call."""

661

class MockResponse:

662

status_code = 200

663

def json(self):

664

return {'processed': True}

665

return MockResponse()

666

```

667

668

### Comprehensive Error Recovery

669

670

```python

671

from celery.exceptions import *

672

import sys

673

674

@app.task(bind=True, max_retries=5, default_retry_delay=60)

675

def robust_task(self, operation_data):

676

"""Task with comprehensive error handling and recovery."""

677

678

try:

679

result = perform_operation(operation_data)

680

return result

681

682

except SoftTimeLimitExceeded:

683

# Save progress and continue in new task

684

save_checkpoint.delay(operation_data, 'timeout')

685

return {'status': 'timeout', 'checkpoint_saved': True}

686

687

except (ConnectionError, TimeoutError) as exc:

688

# Network issues - retry with backoff

689

countdown = min(2 ** self.request.retries * 60, 3600) # Max 1 hour

690

logger.warning(f"Network error, retrying in {countdown}s: {exc}")

691

raise self.retry(countdown=countdown, exc=exc)

692

693

except MemoryError:

694

# Out of memory - reject and requeue

695

logger.error("Out of memory, rejecting task for requeue")

696

raise Reject("Insufficient memory", requeue=True)

697

698

except ValueError as exc:

699

# Data errors - don't retry

700

logger.error(f"Data validation error: {exc}")

701

raise Ignore(f"Invalid data: {exc}")

702

703

except MaxRetriesExceededError:

704

# All retries exhausted - send to dead letter queue

705

logger.error(f"Task failed after {self.max_retries} retries")

706

send_to_dead_letter.delay(self.request.id, operation_data, str(exc))

707

raise

708

709

except Exception as exc:

710

# Unexpected errors - log and retry

711

logger.exception(f"Unexpected error in task {self.request.id}")

712

713

# Don't retry certain critical errors

714

if isinstance(exc, (SystemExit, KeyboardInterrupt)):

715

raise

716

717

# Generic retry for other errors

718

raise self.retry(exc=exc)

719

720

@app.task

721

def save_checkpoint(data, reason):

722

"""Save task checkpoint for recovery."""

723

logger.info(f"Saving checkpoint due to {reason}")

724

# Save to persistent storage

725

726

@app.task

727

def send_to_dead_letter(task_id, data, error):

728

"""Send failed task to dead letter queue for manual review."""

729

logger.error(f"Sending task {task_id} to dead letter queue: {error}")

730

# Store in dead letter queue/database

731

732

def perform_operation(data):

733

"""Simulate operation that might fail."""

734

if not data:

735

raise ValueError("No data provided")

736

return f"processed_{data}"

737

```