0
# Exception Handling
1
2
Complete exception hierarchy for Celery task errors, retry mechanisms, timeout handling, backend errors, and worker-related exceptions. These exceptions provide fine-grained error handling and control flow for robust distributed task processing.
3
4
## Capabilities
5
6
### Core Exceptions
7
8
Base exceptions and fundamental error types that form the foundation of Celery's error handling system.
9
10
```python { .api }
11
class CeleryError(Exception):
12
"""
13
Base exception for all Celery errors.
14
"""
15
16
class ImproperlyConfigured(CeleryError):
17
"""
18
Raised when Celery is improperly configured.
19
20
Common causes:
21
- Missing broker configuration
22
- Invalid serializer settings
23
- Incorrect backend configuration
24
"""
25
26
class SecurityError(CeleryError):
27
"""
28
Raised when security-related operations fail.
29
30
Common causes:
31
- Invalid message signatures
32
- Authentication failures
33
- SSL/TLS errors
34
"""
35
36
class OperationalError(Exception):
37
"""
38
Raised when transport connection error occurs while sending messages.
39
40
Note:
41
This exception does not inherit from CeleryError as it comes
42
from the kombu messaging library.
43
44
Common causes:
45
- Broker connection failures
46
- Network timeouts
47
- Authentication errors with message broker
48
"""
49
```
50
51
### Task Execution Exceptions
52
53
Exceptions related to task execution, retry logic, and task lifecycle management.
54
55
```python { .api }
56
class TaskError(CeleryError):
57
"""Base exception for task-related errors."""
58
59
class TaskPredicate(CeleryError):
60
"""
61
Base class for task predicate exceptions.
62
63
These exceptions control task execution flow rather than
64
indicating actual errors.
65
"""
66
67
class Retry(TaskPredicate):
68
"""
69
Exception raised to trigger task retry.
70
71
Attributes:
72
message (str): Retry reason message
73
exc (Exception): Original exception that caused retry
74
when (datetime): When to retry (eta)
75
"""
76
77
def __init__(self, message=None, exc=None, when=None, **kwargs):
78
"""
79
Create retry exception.
80
81
Args:
82
message (str): Retry message
83
exc (Exception): Causing exception
84
when (datetime): Retry time
85
**kwargs: Additional retry options
86
"""
87
88
class Ignore(TaskPredicate):
89
"""
90
Exception raised to ignore task result.
91
92
When raised, the task state will not be updated and no result
93
will be stored in the result backend.
94
"""
95
96
class Reject(TaskPredicate):
97
"""
98
Exception raised to reject and requeue task.
99
100
Args:
101
reason (str): Rejection reason
102
requeue (bool): Whether to requeue the task
103
"""
104
105
def __init__(self, reason=None, requeue=False):
106
"""
107
Create reject exception.
108
109
Args:
110
reason (str): Reason for rejection
111
requeue (bool): Requeue the task
112
"""
113
114
class NotRegistered(TaskError):
115
"""
116
Raised when attempting to execute unregistered task.
117
118
Args:
119
task_name (str): Name of unregistered task
120
"""
121
122
class AlreadyRegistered(TaskError):
123
"""
124
Raised when attempting to register already registered task.
125
126
Args:
127
task_name (str): Name of already registered task
128
"""
129
130
class MaxRetriesExceededError(TaskError):
131
"""
132
Raised when task exceeds maximum retry attempts.
133
134
Attributes:
135
task_args (tuple): Task arguments
136
task_kwargs (dict): Task keyword arguments
137
task_name (str): Task name
138
task_id (str): Task ID
139
"""
140
141
class TaskRevokedError(TaskError):
142
"""
143
Raised when trying to execute revoked task.
144
145
Args:
146
message (str): Revocation reason
147
task_id (str): Revoked task ID
148
"""
149
150
class InvalidTaskError(TaskError):
151
"""
152
Raised when task data is invalid or corrupted.
153
154
Common causes:
155
- Malformed message body
156
- Missing required task attributes
157
- Invalid task signature
158
"""
159
160
class ChordError(TaskError):
161
"""
162
Raised when chord callback fails or chord is misconfigured.
163
164
Args:
165
message (str): Error description
166
callback (str): Callback task name
167
"""
168
169
class QueueNotFound(TaskError):
170
"""
171
Raised when task is routed to a queue not in conf.queues.
172
173
Args:
174
queue_name (str): Name of the missing queue
175
"""
176
177
class IncompleteStream(TaskError):
178
"""
179
Raised when end of data stream is found but data isn't complete.
180
181
Common causes:
182
- Network interruption during message transfer
183
- Corrupted message payload
184
- Premature connection termination
185
"""
186
```
187
188
### Timeout Exceptions
189
190
Time-related exceptions for handling task execution limits and timeouts.
191
192
```python { .api }
193
class TimeoutError(CeleryError):
194
"""
195
Raised when operation times out.
196
197
Args:
198
operation (str): Operation that timed out
199
timeout (float): Timeout value in seconds
200
"""
201
202
class SoftTimeLimitExceeded(Exception):
203
"""
204
Raised when task exceeds soft time limit.
205
206
This exception allows tasks to perform cleanup before
207
hard termination occurs.
208
"""
209
210
class TimeLimitExceeded(Exception):
211
"""
212
Raised when task exceeds hard time limit.
213
214
This exception indicates immediate task termination.
215
"""
216
```
217
218
### Backend Exceptions
219
220
Exceptions related to result backend operations and storage systems.
221
222
```python { .api }
223
class BackendError(CeleryError):
224
"""
225
Base exception for result backend errors.
226
"""
227
228
class BackendGetMetaError(BackendError):
229
"""
230
Raised when backend fails to retrieve task metadata.
231
232
Args:
233
task_id (str): Task ID that failed to retrieve
234
backend_type (str): Type of backend
235
"""
236
237
class BackendStoreError(BackendError):
238
"""
239
Raised when backend fails to store task result.
240
241
Args:
242
task_id (str): Task ID that failed to store
243
result: Result that failed to store
244
backend_type (str): Type of backend
245
"""
246
```
247
248
### Worker Exceptions
249
250
Exceptions related to worker processes, including termination and process management.
251
252
```python { .api }
253
class WorkerLostError(Exception):
254
"""
255
Raised when worker process is lost unexpectedly.
256
257
Args:
258
message (str): Loss description
259
exitcode (int): Worker exit code
260
"""
261
262
class Terminated(Exception):
263
"""
264
Raised when worker process is terminated.
265
266
Args:
267
signum (int): Signal number that caused termination
268
reason (str): Termination reason
269
"""
270
271
class WorkerShutdown(SystemExit):
272
"""
273
Exception raised to signal worker shutdown.
274
275
Args:
276
msg (str): Shutdown message
277
exitcode (int): Exit code
278
"""
279
280
class WorkerTerminate(SystemExit):
281
"""
282
Exception raised to signal immediate worker termination.
283
284
Args:
285
msg (str): Termination message
286
exitcode (int): Exit code
287
"""
288
```
289
290
### Warning Classes
291
292
Warning categories for non-fatal issues that should be brought to user attention.
293
294
```python { .api }
295
class CeleryWarning(UserWarning):
296
"""Base warning class for Celery."""
297
298
class AlwaysEagerIgnored(CeleryWarning):
299
"""
300
Warning raised when task_always_eager is ignored.
301
302
Occurs when eager execution is requested but not supported
303
in the current context.
304
"""
305
306
class DuplicateNodenameWarning(CeleryWarning):
307
"""
308
Warning raised when duplicate worker node names detected.
309
310
Can cause issues with worker management and monitoring.
311
"""
312
313
class FixupWarning(CeleryWarning):
314
"""
315
Warning raised during fixup operations.
316
317
Indicates potential compatibility or configuration issues.
318
"""
319
320
class NotConfigured(CeleryWarning):
321
"""
322
Warning raised when required configuration is missing.
323
324
Indicates that default values are being used where
325
explicit configuration would be preferred.
326
"""
327
```
328
329
## Usage Examples
330
331
### Basic Exception Handling
332
333
```python
334
from celery import Celery
335
from celery.exceptions import (
336
Retry, Ignore, Reject, MaxRetriesExceededError,
337
SoftTimeLimitExceeded, TimeLimitExceeded
338
)
339
340
app = Celery('exception_example')
341
342
@app.task(bind=True, max_retries=3)
343
def unreliable_task(self, data):
344
"""Task with comprehensive error handling."""
345
346
try:
347
# Simulate unreliable operation
348
if random.random() < 0.3:
349
raise ConnectionError("Network is down")
350
351
# Process data
352
result = process_data(data)
353
return result
354
355
except SoftTimeLimitExceeded:
356
# Cleanup before hard termination
357
cleanup_resources()
358
raise
359
360
except ConnectionError as exc:
361
# Retry on network errors with exponential backoff
362
countdown = 2 ** self.request.retries
363
raise self.retry(countdown=countdown, exc=exc, max_retries=5)
364
365
except ValueError as exc:
366
# Don't retry on data validation errors
367
raise Ignore(f"Invalid data: {exc}")
368
369
except Exception as exc:
370
# Log unexpected errors and retry
371
logger.error(f"Unexpected error in task {self.request.id}: {exc}")
372
raise self.retry(countdown=60, exc=exc)
373
374
def process_data(data):
375
"""Simulate data processing."""
376
if not data:
377
raise ValueError("Empty data")
378
return f"processed_{data}"
379
380
def cleanup_resources():
381
"""Cleanup before task termination."""
382
print("Cleaning up resources...")
383
```
384
385
### Retry Logic with Custom Exceptions
386
387
```python
388
from celery.exceptions import Retry
389
import requests
390
from requests.exceptions import RequestException
391
392
@app.task(bind=True, max_retries=5)
393
def api_call_task(self, url, data):
394
"""Task that handles API failures with smart retry logic."""
395
396
try:
397
response = requests.post(url, json=data, timeout=30)
398
response.raise_for_status()
399
return response.json()
400
401
except requests.exceptions.Timeout:
402
# Retry timeouts with longer delay
403
raise self.retry(countdown=30, exc=exc)
404
405
except requests.exceptions.ConnectionError as exc:
406
# Retry connection errors with exponential backoff
407
countdown = min(2 ** self.request.retries, 300) # Max 5 minutes
408
raise self.retry(countdown=countdown, exc=exc)
409
410
except requests.exceptions.HTTPError as exc:
411
# Don't retry client errors (4xx), do retry server errors (5xx)
412
if 400 <= exc.response.status_code < 500:
413
raise Ignore(f"Client error {exc.response.status_code}: {exc}")
414
else:
415
raise self.retry(countdown=60, exc=exc)
416
417
except MaxRetriesExceededError:
418
# Log final failure and send alert
419
logger.error(f"API call to {url} failed after all retries")
420
send_failure_alert.delay(url, str(exc))
421
raise
422
423
@app.task
424
def send_failure_alert(url, error):
425
"""Send alert for permanent failures."""
426
# Send notification to ops team
427
pass
428
```
429
430
### Task Rejection and Requeuing
431
432
```python
433
from celery.exceptions import Reject
434
import psutil
435
436
@app.task(bind=True)
437
def memory_intensive_task(self, large_data):
438
"""Task that rejects itself if system memory is low."""
439
440
# Check available memory
441
memory = psutil.virtual_memory()
442
if memory.percent > 85: # More than 85% memory used
443
logger.warning("System memory high, rejecting task for requeue")
444
raise Reject("High memory usage", requeue=True)
445
446
try:
447
# Memory intensive processing
448
result = process_large_data(large_data)
449
return result
450
451
except MemoryError:
452
# Reject and requeue if we run out of memory
453
logger.error("Task ran out of memory, requeuing")
454
raise Reject("Out of memory", requeue=True)
455
456
def process_large_data(data):
457
"""Simulate memory intensive processing."""
458
return f"processed_{len(data)}_items"
459
```
460
461
### Time Limit Handling
462
463
```python
464
from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
465
import signal
466
467
@app.task(bind=True, soft_time_limit=300, time_limit=320) # 5min soft, 5min 20s hard
468
def long_running_task(self, items):
469
"""Task with graceful timeout handling."""
470
471
processed_items = []
472
473
try:
474
for i, item in enumerate(items):
475
# Check for soft time limit periodically
476
if i % 100 == 0 and self.request.timelimit:
477
remaining = self.request.timelimit[0] - time.time()
478
if remaining < 30: # Less than 30 seconds left
479
logger.warning("Approaching soft time limit, saving progress")
480
save_partial_results.delay(processed_items)
481
482
result = process_item(item)
483
processed_items.append(result)
484
485
return processed_items
486
487
except SoftTimeLimitExceeded:
488
# Graceful cleanup on soft limit
489
logger.info("Soft time limit exceeded, saving partial results")
490
save_partial_results.delay(processed_items)
491
492
# Continue processing remaining items in new task
493
remaining_items = items[len(processed_items):]
494
if remaining_items:
495
long_running_task.apply_async(args=[remaining_items], countdown=5)
496
497
return f"Partial completion: {len(processed_items)} items processed"
498
499
@app.task
500
def save_partial_results(results):
501
"""Save partial results for recovery."""
502
# Save to database or file
503
logger.info(f"Saved {len(results)} partial results")
504
505
def process_item(item):
506
"""Process individual item."""
507
import time
508
time.sleep(0.1) # Simulate processing time
509
return f"processed_{item}"
510
```
511
512
### Backend Error Handling
513
514
```python
515
from celery.exceptions import BackendError, BackendStoreError
516
from celery import Celery
517
518
@app.task(bind=True, ignore_result=False)
519
def critical_task(self, important_data):
520
"""Task with explicit backend error handling."""
521
522
try:
523
# Critical processing
524
result = perform_critical_operation(important_data)
525
526
# Try to store result explicitly
527
try:
528
self.update_state(state='SUCCESS', meta={'result': result})
529
except BackendStoreError as exc:
530
# Backend failed, store locally as fallback
531
logger.error(f"Failed to store result in backend: {exc}")
532
store_result_locally(self.request.id, result)
533
534
return result
535
536
except Exception as exc:
537
# Ensure error is logged even if backend fails
538
try:
539
self.update_state(
540
state='FAILURE',
541
meta={'error': str(exc), 'traceback': traceback.format_exc()}
542
)
543
except BackendError:
544
# Backend completely unavailable
545
logger.critical(f"Backend unavailable, task {self.request.id} result lost")
546
store_result_locally(self.request.id, {'error': str(exc)})
547
548
raise
549
550
def perform_critical_operation(data):
551
"""Simulate critical operation."""
552
return f"critical_result_{data}"
553
554
def store_result_locally(task_id, result):
555
"""Store result locally when backend fails."""
556
# Store in local file, database, etc.
557
with open(f'/tmp/celery_results/{task_id}.json', 'w') as f:
558
json.dump({'task_id': task_id, 'result': result}, f)
559
```
560
561
### Worker Process Management
562
563
```python
564
from celery.exceptions import WorkerLostError, Terminated
565
from celery.signals import worker_process_shutdown, task_failure
566
567
@worker_process_shutdown.connect
568
def handle_worker_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
569
"""Handle worker process shutdown."""
570
571
if exitcode != 0:
572
logger.error(f"Worker process {pid} died unexpectedly with exit code {exitcode}")
573
574
# Notify monitoring system
575
notify_ops_team.delay(f"Worker {pid} crashed with exit code {exitcode}")
576
577
@task_failure.connect
578
def handle_task_failure(sender=None, task_id=None, exception=None, **kwargs):
579
"""Handle task failures, including worker-related ones."""
580
581
if isinstance(exception, WorkerLostError):
582
logger.error(f"Task {task_id} failed due to worker loss")
583
584
# Requeue critical tasks
585
if sender.name in ['critical_task', 'payment_processing']:
586
requeue_critical_task.delay(task_id, sender.name)
587
588
elif isinstance(exception, Terminated):
589
logger.warning(f"Task {task_id} was terminated")
590
591
@app.task
592
def notify_ops_team(message):
593
"""Send notification to operations team."""
594
# Send Slack message, email, etc.
595
pass
596
597
@app.task
598
def requeue_critical_task(task_id, task_name):
599
"""Requeue critical tasks that failed due to worker issues."""
600
# Logic to requeue the task
601
pass
602
```
603
604
### Custom Exception Classes
605
606
```python
607
from celery.exceptions import TaskError, CeleryError
608
609
class DataValidationError(TaskError):
610
"""Custom exception for data validation failures."""
611
612
def __init__(self, field, value, message=None):
613
self.field = field
614
self.value = value
615
self.message = message or f"Invalid {field}: {value}"
616
super().__init__(self.message)
617
618
class ExternalServiceError(CeleryError):
619
"""Custom exception for external service failures."""
620
621
def __init__(self, service_name, error_code, message=None):
622
self.service_name = service_name
623
self.error_code = error_code
624
self.message = message or f"{service_name} error {error_code}"
625
super().__init__(self.message)
626
627
@app.task(bind=True, max_retries=3)
628
def validate_and_process(self, user_data):
629
"""Task using custom exceptions."""
630
631
try:
632
# Validate data
633
if not user_data.get('email'):
634
raise DataValidationError('email', user_data.get('email'), 'Email is required')
635
636
if '@' not in user_data['email']:
637
raise DataValidationError('email', user_data['email'], 'Invalid email format')
638
639
# Call external service
640
response = call_external_api(user_data)
641
642
if response.status_code != 200:
643
raise ExternalServiceError('UserAPI', response.status_code, response.text)
644
645
return response.json()
646
647
except DataValidationError:
648
# Don't retry validation errors
649
raise Ignore(f"Data validation failed: {exc}")
650
651
except ExternalServiceError as exc:
652
if exc.error_code >= 500:
653
# Retry server errors
654
raise self.retry(countdown=30, exc=exc)
655
else:
656
# Don't retry client errors
657
raise Ignore(f"External service error: {exc}")
658
659
def call_external_api(data):
660
"""Simulate external API call."""
661
class MockResponse:
662
status_code = 200
663
def json(self):
664
return {'processed': True}
665
return MockResponse()
666
```
667
668
### Comprehensive Error Recovery
669
670
```python
671
from celery.exceptions import *
672
import sys
673
674
@app.task(bind=True, max_retries=5, default_retry_delay=60)
675
def robust_task(self, operation_data):
676
"""Task with comprehensive error handling and recovery."""
677
678
try:
679
result = perform_operation(operation_data)
680
return result
681
682
except SoftTimeLimitExceeded:
683
# Save progress and continue in new task
684
save_checkpoint.delay(operation_data, 'timeout')
685
return {'status': 'timeout', 'checkpoint_saved': True}
686
687
except (ConnectionError, TimeoutError) as exc:
688
# Network issues - retry with backoff
689
countdown = min(2 ** self.request.retries * 60, 3600) # Max 1 hour
690
logger.warning(f"Network error, retrying in {countdown}s: {exc}")
691
raise self.retry(countdown=countdown, exc=exc)
692
693
except MemoryError:
694
# Out of memory - reject and requeue
695
logger.error("Out of memory, rejecting task for requeue")
696
raise Reject("Insufficient memory", requeue=True)
697
698
except ValueError as exc:
699
# Data errors - don't retry
700
logger.error(f"Data validation error: {exc}")
701
raise Ignore(f"Invalid data: {exc}")
702
703
except MaxRetriesExceededError:
704
# All retries exhausted - send to dead letter queue
705
logger.error(f"Task failed after {self.max_retries} retries")
706
send_to_dead_letter.delay(self.request.id, operation_data, str(exc))
707
raise
708
709
except Exception as exc:
710
# Unexpected errors - log and retry
711
logger.exception(f"Unexpected error in task {self.request.id}")
712
713
# Don't retry certain critical errors
714
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
715
raise
716
717
# Generic retry for other errors
718
raise self.retry(exc=exc)
719
720
@app.task
721
def save_checkpoint(data, reason):
722
"""Save task checkpoint for recovery."""
723
logger.info(f"Saving checkpoint due to {reason}")
724
# Save to persistent storage
725
726
@app.task
727
def send_to_dead_letter(task_id, data, error):
728
"""Send failed task to dead letter queue for manual review."""
729
logger.error(f"Sending task {task_id} to dead letter queue: {error}")
730
# Store in dead letter queue/database
731
732
def perform_operation(data):
733
"""Simulate operation that might fail."""
734
if not data:
735
raise ValueError("No data provided")
736
return f"processed_{data}"
737
```