0
# Signals and Events
1
2
Signal-based event system for monitoring task lifecycle, worker status, and system events. These hooks enable custom logging, monitoring, debugging, and integration with external systems throughout Celery's execution pipeline.
3
4
## Capabilities
5
6
### Task Lifecycle Signals
7
8
Signals fired during task execution phases, providing hooks for monitoring, logging, and custom behavior at each stage.
9
10
```python { .api }
11
# Task execution signals
12
before_task_publish = Signal()
13
after_task_publish = Signal()
14
task_prerun = Signal()
15
task_postrun = Signal()
16
task_success = Signal()
17
task_failure = Signal()
18
task_retry = Signal()
19
task_revoked = Signal()
20
task_received = Signal()
21
task_rejected = Signal()
22
task_unknown = Signal()
23
24
class Signal:
25
def connect(self, callback, sender=None, weak=True, dispatch_uid=None):
26
"""
27
Connect callback to signal.
28
29
Args:
30
callback (callable): Function to call when signal fires
31
sender: Signal sender filter (None for all senders)
32
weak (bool): Use weak references to callback
33
dispatch_uid: Unique identifier for this connection
34
"""
35
36
def disconnect(self, callback=None, sender=None, dispatch_uid=None):
37
"""
38
Disconnect callback from signal.
39
40
Args:
41
callback (callable): Callback to disconnect
42
sender: Sender filter
43
dispatch_uid: Connection identifier
44
45
Returns:
46
bool: True if disconnected
47
"""
48
49
def send(self, sender, **kwargs):
50
"""
51
Send signal to all connected callbacks.
52
53
Args:
54
sender: Signal sender
55
**kwargs: Signal data
56
57
Returns:
58
list: [(receiver, response), ...] for each callback
59
"""
60
61
def before_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, declare=None, retry_policy=None, **kwargs):
62
"""
63
Signal fired before task is published to broker.
64
65
Args:
66
sender: Publisher instance
67
headers (dict): Message headers
68
body (dict): Message body
69
routing_key (str): Message routing key
70
exchange (str): Exchange name
71
declare (list): Exchanges/queues to declare
72
retry_policy (dict): Retry configuration
73
"""
74
75
def after_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, **kwargs):
76
"""
77
Signal fired after task is published to broker.
78
79
Args:
80
sender: Publisher instance
81
headers (dict): Message headers
82
body (dict): Message body
83
routing_key (str): Message routing key
84
exchange (str): Exchange name
85
"""
86
87
def task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
88
"""
89
Signal fired before task execution.
90
91
Args:
92
sender: Task class
93
task_id (str): Task ID
94
task: Task instance
95
args (tuple): Task arguments
96
kwargs (dict): Task keyword arguments
97
"""
98
99
def task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
100
"""
101
Signal fired after task execution.
102
103
Args:
104
sender: Task class
105
task_id (str): Task ID
106
task: Task instance
107
args (tuple): Task arguments
108
kwargs (dict): Task keyword arguments
109
retval: Task return value
110
state (str): Final task state
111
"""
112
113
def task_success(sender=None, result=None, **kwds):
114
"""
115
Signal fired when task succeeds.
116
117
Args:
118
sender: Task class
119
result: Task return value
120
"""
121
122
def task_failure(sender=None, task_id=None, exception=None, einfo=None, **kwds):
123
"""
124
Signal fired when task fails.
125
126
Args:
127
sender: Task class
128
task_id (str): Task ID
129
exception: Exception instance
130
einfo: Exception info object
131
"""
132
133
def task_retry(sender=None, task_id=None, reason=None, einfo=None, **kwds):
134
"""
135
Signal fired when task is retried.
136
137
Args:
138
sender: Task class
139
task_id (str): Task ID
140
reason: Retry reason/exception
141
einfo: Exception info
142
"""
143
144
def task_revoked(sender=None, request=None, terminated=None, signum=None, expired=None, **kwds):
145
"""
146
Signal fired when task is revoked.
147
148
Args:
149
sender: Task class
150
request: Task request object
151
terminated (bool): Process was terminated
152
signum (int): Signal number if terminated
153
expired (bool): Task expired
154
"""
155
156
def task_received(sender=None, request=None, **kwargs):
157
"""
158
Signal fired when worker receives task.
159
160
Args:
161
sender: Consumer instance
162
request: Task request object
163
"""
164
165
def task_rejected(sender=None, message=None, exc=None, **kwargs):
166
"""
167
Signal fired when task is rejected.
168
169
Args:
170
sender: Consumer instance
171
message: Rejected message
172
exc: Rejection exception
173
"""
174
175
def task_unknown(sender=None, message=None, exc=None, name=None, id=None, **kwargs):
176
"""
177
Signal fired when unknown task received.
178
179
Args:
180
sender: Consumer instance
181
message: Unknown message
182
exc: Exception raised
183
name (str): Unknown task name
184
id (str): Task ID
185
"""
186
```
187
188
### Worker Lifecycle Signals
189
190
Signals for monitoring worker startup, shutdown, and process management events.
191
192
```python { .api }
193
# Worker lifecycle signals
194
worker_init = Signal()
195
worker_process_init = Signal()
196
worker_process_shutdown = Signal()
197
worker_ready = Signal()
198
worker_shutdown = Signal()
199
worker_shutting_down = Signal()
200
celeryd_init = Signal()
201
celeryd_after_setup = Signal()
202
203
def worker_init(sender=None, **kwargs):
204
"""
205
Signal fired when worker initializes.
206
207
Args:
208
sender: Worker instance
209
"""
210
211
def worker_process_init(sender=None, **kwargs):
212
"""
213
Signal fired when worker process initializes.
214
215
Args:
216
sender: Worker instance
217
"""
218
219
def worker_process_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
220
"""
221
Signal fired when worker process shuts down.
222
223
Args:
224
sender: Worker instance
225
pid (int): Process ID
226
exitcode (int): Process exit code
227
"""
228
229
def worker_ready(sender=None, **kwargs):
230
"""
231
Signal fired when worker is ready to receive tasks.
232
233
Args:
234
sender: Worker instance
235
"""
236
237
def worker_shutdown(sender=None, **kwargs):
238
"""
239
Signal fired when worker shuts down.
240
241
Args:
242
sender: Worker instance
243
"""
244
245
def worker_shutting_down(sender=None, **kwargs):
246
"""
247
Signal fired when worker begins shutdown.
248
249
Args:
250
sender: Worker instance
251
"""
252
253
def celeryd_init(sender=None, instance=None, conf=None, **kwargs):
254
"""
255
Signal fired when celery daemon initializes.
256
257
Args:
258
sender: Worker class
259
instance: Worker instance
260
conf: Configuration object
261
"""
262
263
def celeryd_after_setup(sender=None, instance=None, conf=None, **kwargs):
264
"""
265
Signal fired after celery daemon setup.
266
267
Args:
268
sender: Worker class
269
instance: Worker instance
270
conf: Configuration object
271
"""
272
```
273
274
### Beat Scheduler Signals
275
276
Signals for monitoring periodic task scheduler events and beat service lifecycle.
277
278
```python { .api }
279
# Beat scheduler signals
280
beat_init = Signal()
281
beat_embedded_init = Signal()
282
283
def beat_init(sender=None, **kwargs):
284
"""
285
Signal fired when beat scheduler initializes.
286
287
Args:
288
sender: Beat service instance
289
"""
290
291
def beat_embedded_init(sender=None, **kwargs):
292
"""
293
Signal fired when embedded beat initializes.
294
295
Args:
296
sender: Beat service instance
297
"""
298
```
299
300
### Logging and Setup Signals
301
302
Signals for customizing logging configuration and system setup procedures.
303
304
```python { .api }
305
# Logging signals
306
setup_logging = Signal()
307
after_setup_logger = Signal()
308
after_setup_task_logger = Signal()
309
310
def setup_logging(sender=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
311
"""
312
Signal fired during logging setup.
313
314
Args:
315
sender: Logging setup caller
316
loglevel (int): Log level
317
logfile (str): Log file path
318
format (str): Log format string
319
colorize (bool): Enable colored output
320
"""
321
322
def after_setup_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
323
"""
324
Signal fired after logger setup.
325
326
Args:
327
sender: Logger setup caller
328
logger: Logger instance
329
loglevel (int): Log level
330
logfile (str): Log file path
331
format (str): Log format
332
colorize (bool): Colored output enabled
333
"""
334
335
def after_setup_task_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
336
"""
337
Signal fired after task logger setup.
338
339
Args:
340
sender: Task logger setup caller
341
logger: Task logger instance
342
loglevel (int): Log level
343
logfile (str): Log file path
344
format (str): Log format
345
colorize (bool): Colored output enabled
346
"""
347
```
348
349
### Signal Connection Decorators
350
351
Convenience decorators for connecting signal handlers to specific events.
352
353
```python { .api }
354
def receiver(signal, **kwargs):
355
"""
356
Decorator for connecting signal handlers.
357
358
Args:
359
signal: Signal instance to connect to
360
**kwargs: Connection options (sender, dispatch_uid, etc.)
361
362
Returns:
363
Decorator function
364
"""
365
366
# Usage patterns
367
@receiver(task_success)
368
def task_success_handler(sender=None, result=None, **kwargs):
369
"""Handle task success."""
370
pass
371
372
@receiver(task_failure)
373
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
374
"""Handle task failure."""
375
pass
376
```
377
378
## Usage Examples
379
380
### Basic Signal Handlers
381
382
```python
383
from celery import Celery
384
from celery.signals import (
385
task_prerun, task_postrun, task_success, task_failure,
386
worker_ready, worker_shutdown
387
)
388
389
app = Celery('signal_example')
390
391
@task_prerun.connect
392
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
393
"""Log task start."""
394
print(f'Task {task.name}[{task_id}] starting with args={args}, kwargs={kwargs}')
395
396
@task_postrun.connect
397
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
398
"""Log task completion."""
399
print(f'Task {task.name}[{task_id}] finished with state={state}, result={retval}')
400
401
@task_success.connect
402
def task_success_handler(sender=None, result=None, **kwargs):
403
"""Handle successful task completion."""
404
print(f'Task {sender.name} succeeded with result: {result}')
405
406
@task_failure.connect
407
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
408
"""Handle task failure."""
409
print(f'Task {sender.name}[{task_id}] failed: {exception}')
410
print(f'Traceback: {einfo.traceback}')
411
412
@worker_ready.connect
413
def worker_ready_handler(sender=None, **kwargs):
414
"""Log when worker becomes ready."""
415
print(f'Worker {sender.hostname} is ready to receive tasks')
416
417
@worker_shutdown.connect
418
def worker_shutdown_handler(sender=None, **kwargs):
419
"""Log worker shutdown."""
420
print(f'Worker {sender.hostname} is shutting down')
421
```
422
423
### Advanced Monitoring and Metrics
424
425
```python
426
import time
427
import json
428
from datetime import datetime
429
from celery.signals import (
430
before_task_publish, after_task_publish,
431
task_prerun, task_postrun, task_success, task_failure,
432
worker_process_init
433
)
434
435
# Task execution metrics
436
task_metrics = {
437
'published': 0,
438
'started': 0,
439
'completed': 0,
440
'failed': 0,
441
'total_execution_time': 0
442
}
443
444
@before_task_publish.connect
445
def track_task_published(sender=None, headers=None, body=None, **kwargs):
446
"""Track task publication."""
447
task_metrics['published'] += 1
448
task_name = headers.get('task', 'unknown')
449
print(f'Publishing task: {task_name}')
450
451
@task_prerun.connect
452
def track_task_start(sender=None, task_id=None, task=None, **kwargs):
453
"""Track task execution start."""
454
task_metrics['started'] += 1
455
456
# Store start time for duration calculation
457
task.request.start_time = time.time()
458
459
print(f'Starting task {task.name}[{task_id}] at {datetime.now()}')
460
461
@task_postrun.connect
462
def track_task_completion(sender=None, task_id=None, task=None, state=None, **kwargs):
463
"""Track task completion and calculate duration."""
464
task_metrics['completed'] += 1
465
466
# Calculate execution time
467
if hasattr(task.request, 'start_time'):
468
duration = time.time() - task.request.start_time
469
task_metrics['total_execution_time'] += duration
470
print(f'Task {task.name}[{task_id}] completed in {duration:.2f}s with state {state}')
471
472
@task_failure.connect
473
def track_task_failure(sender=None, task_id=None, exception=None, **kwargs):
474
"""Track task failures."""
475
task_metrics['failed'] += 1
476
print(f'Task {sender.name}[{task_id}] failed: {type(exception).__name__}: {exception}')
477
478
def print_metrics():
479
"""Print current metrics."""
480
print("\n=== Task Metrics ===")
481
print(f"Published: {task_metrics['published']}")
482
print(f"Started: {task_metrics['started']}")
483
print(f"Completed: {task_metrics['completed']}")
484
print(f"Failed: {task_metrics['failed']}")
485
if task_metrics['completed'] > 0:
486
avg_time = task_metrics['total_execution_time'] / task_metrics['completed']
487
print(f"Average execution time: {avg_time:.2f}s")
488
489
@worker_process_init.connect
490
def setup_worker_monitoring(sender=None, **kwargs):
491
"""Setup worker-specific monitoring."""
492
print(f'Worker process {sender.hostname} initialized with PID {sender.pid}')
493
```
494
495
### Custom Logging Integration
496
497
```python
498
import logging
499
from celery.signals import (
500
setup_logging, after_setup_logger,
501
task_prerun, task_postrun, task_failure
502
)
503
504
# Custom logger setup
505
@setup_logging.connect
506
def setup_custom_logging(sender=None, loglevel=None, logfile=None, **kwargs):
507
"""Setup custom logging configuration."""
508
509
# Configure root logger
510
logging.basicConfig(
511
level=loglevel,
512
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
513
handlers=[
514
logging.FileHandler(logfile or 'celery.log'),
515
logging.StreamHandler()
516
]
517
)
518
519
# Disable default Celery logging setup
520
return True
521
522
@after_setup_logger.connect
523
def configure_task_logger(sender=None, logger=None, **kwargs):
524
"""Configure task-specific logger."""
525
526
# Add custom handler for task logs
527
task_handler = logging.FileHandler('tasks.log')
528
task_handler.setFormatter(logging.Formatter(
529
'%(asctime)s [%(levelname)s] %(name)s: %(message)s'
530
))
531
532
logger.addHandler(task_handler)
533
logger.info('Custom task logger configured')
534
535
# Task-specific logging
536
@task_prerun.connect
537
def log_task_start(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
538
"""Log task start with context."""
539
logger = logging.getLogger(f'tasks.{task.name}')
540
logger.info(f'Starting task {task_id} with args={args}, kwargs={kwargs}')
541
542
@task_postrun.connect
543
def log_task_end(sender=None, task_id=None, task=None, retval=None, state=None, **kwds):
544
"""Log task completion."""
545
logger = logging.getLogger(f'tasks.{task.name}')
546
logger.info(f'Task {task_id} finished with state={state}')
547
548
@task_failure.connect
549
def log_task_error(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
550
"""Log task errors with full context."""
551
logger = logging.getLogger(f'tasks.{sender.name}')
552
logger.error(f'Task {task_id} failed: {exception}', exc_info=einfo)
553
```
554
555
### External System Integration
556
557
```python
558
import json
559
import requests
560
from celery.signals import task_success, task_failure, worker_ready
561
562
# Webhook notifications
563
WEBHOOK_URL = 'https://monitoring.example.com/webhook'
564
565
@task_success.connect
566
def notify_task_success(sender=None, result=None, **kwargs):
567
"""Send webhook notification on task success."""
568
569
payload = {
570
'event': 'task_success',
571
'task_name': sender.name,
572
'task_id': kwargs.get('task_id'),
573
'result': str(result),
574
'timestamp': datetime.now().isoformat()
575
}
576
577
try:
578
requests.post(WEBHOOK_URL, json=payload, timeout=5)
579
except Exception as e:
580
print(f'Failed to send webhook: {e}')
581
582
@task_failure.connect
583
def notify_task_failure(sender=None, task_id=None, exception=None, **kwargs):
584
"""Send alert on task failure."""
585
586
payload = {
587
'event': 'task_failure',
588
'task_name': sender.name,
589
'task_id': task_id,
590
'error': str(exception),
591
'error_type': type(exception).__name__,
592
'timestamp': datetime.now().isoformat(),
593
'severity': 'high'
594
}
595
596
try:
597
requests.post(WEBHOOK_URL, json=payload, timeout=5)
598
except Exception as e:
599
print(f'Failed to send failure alert: {e}')
600
601
# Metrics collection
602
@worker_ready.connect
603
def register_worker(sender=None, **kwargs):
604
"""Register worker with monitoring system."""
605
606
payload = {
607
'event': 'worker_ready',
608
'hostname': sender.hostname,
609
'pid': getattr(sender, 'pid', None),
610
'timestamp': datetime.now().isoformat()
611
}
612
613
try:
614
requests.post(f'{WEBHOOK_URL}/workers', json=payload, timeout=5)
615
except Exception as e:
616
print(f'Failed to register worker: {e}')
617
```
618
619
### Signal-Based Task Routing
620
621
```python
622
from celery.signals import before_task_publish
623
624
@before_task_publish.connect
625
def route_priority_tasks(sender=None, headers=None, body=None, routing_key=None, **kwargs):
626
"""Dynamically route tasks based on priority."""
627
628
task_name = headers.get('task')
629
priority = body.get('kwargs', {}).get('priority', 'normal')
630
631
# Route high priority tasks to dedicated queue
632
if priority == 'high':
633
headers['routing_key'] = 'high_priority'
634
print(f'Routing {task_name} to high priority queue')
635
636
# Route long-running tasks to separate workers
637
elif task_name in ['process_large_file', 'generate_report']:
638
headers['routing_key'] = 'long_running'
639
print(f'Routing {task_name} to long running queue')
640
```
641
642
### Debugging and Development
643
644
```python
645
from celery.signals import *
646
647
# Debug signal that logs all task activity
648
@task_prerun.connect
649
@task_postrun.connect
650
@task_success.connect
651
@task_failure.connect
652
@task_retry.connect
653
@task_revoked.connect
654
def debug_task_signals(sender=None, **kwargs):
655
"""Log all task signals for debugging."""
656
657
import inspect
658
signal_name = inspect.stack()[1].function.replace('_handler', '')
659
660
debug_info = {
661
'signal': signal_name,
662
'task': getattr(sender, 'name', str(sender)) if sender else None,
663
'data': {k: str(v) for k, v in kwargs.items() if k not in ['einfo']}
664
}
665
666
print(f'DEBUG SIGNAL: {json.dumps(debug_info, indent=2)}')
667
668
# Performance monitoring
669
task_times = {}
670
671
@task_prerun.connect
672
def start_timer(sender=None, task_id=None, **kwargs):
673
"""Start timing task execution."""
674
task_times[task_id] = time.time()
675
676
@task_postrun.connect
677
def end_timer(sender=None, task_id=None, **kwargs):
678
"""End timing and log duration."""
679
if task_id in task_times:
680
duration = time.time() - task_times[task_id]
681
print(f'Task {sender.name}[{task_id}] took {duration:.3f}s')
682
del task_times[task_id]
683
```
684
685
### Conditional Signal Handlers
686
687
```python
688
from celery.signals import task_failure
689
import os
690
691
# Only handle failures in production
692
@task_failure.connect
693
def production_error_handler(sender=None, task_id=None, exception=None, **kwargs):
694
"""Handle errors differently in production vs development."""
695
696
if os.environ.get('ENVIRONMENT') == 'production':
697
# Send to error tracking service
698
send_to_sentry(exception, task_id, sender.name)
699
700
# Page on-call engineer for critical tasks
701
if sender.name in ['process_payment', 'send_notification']:
702
page_oncall_engineer(f'Critical task {sender.name} failed: {exception}')
703
else:
704
# Just log in development
705
print(f'DEV: Task {sender.name}[{task_id}] failed: {exception}')
706
707
def send_to_sentry(exception, task_id, task_name):
708
"""Send error to Sentry."""
709
# Sentry integration code
710
pass
711
712
def page_oncall_engineer(message):
713
"""Send alert to on-call engineer."""
714
# Alerting system integration
715
pass
716
```