Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Signal-based event system for monitoring task lifecycle, worker status, and system events. These hooks enable custom logging, monitoring, debugging, and integration with external systems throughout Celery's execution pipeline.
Signals fired during task execution phases, providing hooks for monitoring, logging, and custom behavior at each stage.
# Task execution signals
before_task_publish = Signal()
after_task_publish = Signal()
task_prerun = Signal()
task_postrun = Signal()
task_success = Signal()
task_failure = Signal()
task_retry = Signal()
task_revoked = Signal()
task_received = Signal()
task_rejected = Signal()
task_unknown = Signal()
class Signal:
def connect(self, callback, sender=None, weak=True, dispatch_uid=None):
"""
Connect callback to signal.
Args:
callback (callable): Function to call when signal fires
sender: Signal sender filter (None for all senders)
weak (bool): Use weak references to callback
dispatch_uid: Unique identifier for this connection
"""
def disconnect(self, callback=None, sender=None, dispatch_uid=None):
"""
Disconnect callback from signal.
Args:
callback (callable): Callback to disconnect
sender: Sender filter
dispatch_uid: Connection identifier
Returns:
bool: True if disconnected
"""
def send(self, sender, **kwargs):
"""
Send signal to all connected callbacks.
Args:
sender: Signal sender
**kwargs: Signal data
Returns:
list: [(receiver, response), ...] for each callback
"""
def before_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, declare=None, retry_policy=None, **kwargs):
"""
Signal fired before task is published to broker.
Args:
sender: Publisher instance
headers (dict): Message headers
body (dict): Message body
routing_key (str): Message routing key
exchange (str): Exchange name
declare (list): Exchanges/queues to declare
retry_policy (dict): Retry configuration
"""
def after_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, **kwargs):
"""
Signal fired after task is published to broker.
Args:
sender: Publisher instance
headers (dict): Message headers
body (dict): Message body
routing_key (str): Message routing key
exchange (str): Exchange name
"""
def task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""
Signal fired before task execution.
Args:
sender: Task class
task_id (str): Task ID
task: Task instance
args (tuple): Task arguments
kwargs (dict): Task keyword arguments
"""
def task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
"""
Signal fired after task execution.
Args:
sender: Task class
task_id (str): Task ID
task: Task instance
args (tuple): Task arguments
kwargs (dict): Task keyword arguments
retval: Task return value
state (str): Final task state
"""
def task_success(sender=None, result=None, **kwds):
"""
Signal fired when task succeeds.
Args:
sender: Task class
result: Task return value
"""
def task_failure(sender=None, task_id=None, exception=None, einfo=None, **kwds):
"""
Signal fired when task fails.
Args:
sender: Task class
task_id (str): Task ID
exception: Exception instance
einfo: Exception info object
"""
def task_retry(sender=None, task_id=None, reason=None, einfo=None, **kwds):
"""
Signal fired when task is retried.
Args:
sender: Task class
task_id (str): Task ID
reason: Retry reason/exception
einfo: Exception info
"""
def task_revoked(sender=None, request=None, terminated=None, signum=None, expired=None, **kwds):
"""
Signal fired when task is revoked.
Args:
sender: Task class
request: Task request object
terminated (bool): Process was terminated
signum (int): Signal number if terminated
expired (bool): Task expired
"""
def task_received(sender=None, request=None, **kwargs):
"""
Signal fired when worker receives task.
Args:
sender: Consumer instance
request: Task request object
"""
def task_rejected(sender=None, message=None, exc=None, **kwargs):
"""
Signal fired when task is rejected.
Args:
sender: Consumer instance
message: Rejected message
exc: Rejection exception
"""
def task_unknown(sender=None, message=None, exc=None, name=None, id=None, **kwargs):
"""
Signal fired when unknown task received.
Args:
sender: Consumer instance
message: Unknown message
exc: Exception raised
name (str): Unknown task name
id (str): Task ID
"""Signals for monitoring worker startup, shutdown, and process management events.
# Worker lifecycle signals
worker_init = Signal()
worker_process_init = Signal()
worker_process_shutdown = Signal()
worker_ready = Signal()
worker_shutdown = Signal()
worker_shutting_down = Signal()
celeryd_init = Signal()
celeryd_after_setup = Signal()
def worker_init(sender=None, **kwargs):
"""
Signal fired when worker initializes.
Args:
sender: Worker instance
"""
def worker_process_init(sender=None, **kwargs):
"""
Signal fired when worker process initializes.
Args:
sender: Worker instance
"""
def worker_process_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
"""
Signal fired when worker process shuts down.
Args:
sender: Worker instance
pid (int): Process ID
exitcode (int): Process exit code
"""
def worker_ready(sender=None, **kwargs):
"""
Signal fired when worker is ready to receive tasks.
Args:
sender: Worker instance
"""
def worker_shutdown(sender=None, **kwargs):
"""
Signal fired when worker shuts down.
Args:
sender: Worker instance
"""
def worker_shutting_down(sender=None, **kwargs):
"""
Signal fired when worker begins shutdown.
Args:
sender: Worker instance
"""
def celeryd_init(sender=None, instance=None, conf=None, **kwargs):
"""
Signal fired when celery daemon initializes.
Args:
sender: Worker class
instance: Worker instance
conf: Configuration object
"""
def celeryd_after_setup(sender=None, instance=None, conf=None, **kwargs):
"""
Signal fired after celery daemon setup.
Args:
sender: Worker class
instance: Worker instance
conf: Configuration object
"""Signals for monitoring periodic task scheduler events and beat service lifecycle.
# Beat scheduler signals
beat_init = Signal()
beat_embedded_init = Signal()
def beat_init(sender=None, **kwargs):
"""
Signal fired when beat scheduler initializes.
Args:
sender: Beat service instance
"""
def beat_embedded_init(sender=None, **kwargs):
"""
Signal fired when embedded beat initializes.
Args:
sender: Beat service instance
"""Signals for customizing logging configuration and system setup procedures.
# Logging signals
setup_logging = Signal()
after_setup_logger = Signal()
after_setup_task_logger = Signal()
def setup_logging(sender=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
"""
Signal fired during logging setup.
Args:
sender: Logging setup caller
loglevel (int): Log level
logfile (str): Log file path
format (str): Log format string
colorize (bool): Enable colored output
"""
def after_setup_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
"""
Signal fired after logger setup.
Args:
sender: Logger setup caller
logger: Logger instance
loglevel (int): Log level
logfile (str): Log file path
format (str): Log format
colorize (bool): Colored output enabled
"""
def after_setup_task_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
"""
Signal fired after task logger setup.
Args:
sender: Task logger setup caller
logger: Task logger instance
loglevel (int): Log level
logfile (str): Log file path
format (str): Log format
colorize (bool): Colored output enabled
"""Convenience decorators for connecting signal handlers to specific events.
def receiver(signal, **kwargs):
"""
Decorator for connecting signal handlers.
Args:
signal: Signal instance to connect to
**kwargs: Connection options (sender, dispatch_uid, etc.)
Returns:
Decorator function
"""
# Usage patterns
@receiver(task_success)
def task_success_handler(sender=None, result=None, **kwargs):
"""Handle task success."""
pass
@receiver(task_failure)
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
"""Handle task failure."""
passfrom celery import Celery
from celery.signals import (
task_prerun, task_postrun, task_success, task_failure,
worker_ready, worker_shutdown
)
app = Celery('signal_example')
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Log task start."""
print(f'Task {task.name}[{task_id}] starting with args={args}, kwargs={kwargs}')
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
"""Log task completion."""
print(f'Task {task.name}[{task_id}] finished with state={state}, result={retval}')
@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
"""Handle successful task completion."""
print(f'Task {sender.name} succeeded with result: {result}')
@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
"""Handle task failure."""
print(f'Task {sender.name}[{task_id}] failed: {exception}')
print(f'Traceback: {einfo.traceback}')
@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
"""Log when worker becomes ready."""
print(f'Worker {sender.hostname} is ready to receive tasks')
@worker_shutdown.connect
def worker_shutdown_handler(sender=None, **kwargs):
"""Log worker shutdown."""
print(f'Worker {sender.hostname} is shutting down')import time
import json
from datetime import datetime
from celery.signals import (
before_task_publish, after_task_publish,
task_prerun, task_postrun, task_success, task_failure,
worker_process_init
)
# Task execution metrics
task_metrics = {
'published': 0,
'started': 0,
'completed': 0,
'failed': 0,
'total_execution_time': 0
}
@before_task_publish.connect
def track_task_published(sender=None, headers=None, body=None, **kwargs):
"""Track task publication."""
task_metrics['published'] += 1
task_name = headers.get('task', 'unknown')
print(f'Publishing task: {task_name}')
@task_prerun.connect
def track_task_start(sender=None, task_id=None, task=None, **kwargs):
"""Track task execution start."""
task_metrics['started'] += 1
# Store start time for duration calculation
task.request.start_time = time.time()
print(f'Starting task {task.name}[{task_id}] at {datetime.now()}')
@task_postrun.connect
def track_task_completion(sender=None, task_id=None, task=None, state=None, **kwargs):
"""Track task completion and calculate duration."""
task_metrics['completed'] += 1
# Calculate execution time
if hasattr(task.request, 'start_time'):
duration = time.time() - task.request.start_time
task_metrics['total_execution_time'] += duration
print(f'Task {task.name}[{task_id}] completed in {duration:.2f}s with state {state}')
@task_failure.connect
def track_task_failure(sender=None, task_id=None, exception=None, **kwargs):
"""Track task failures."""
task_metrics['failed'] += 1
print(f'Task {sender.name}[{task_id}] failed: {type(exception).__name__}: {exception}')
def print_metrics():
"""Print current metrics."""
print("\n=== Task Metrics ===")
print(f"Published: {task_metrics['published']}")
print(f"Started: {task_metrics['started']}")
print(f"Completed: {task_metrics['completed']}")
print(f"Failed: {task_metrics['failed']}")
if task_metrics['completed'] > 0:
avg_time = task_metrics['total_execution_time'] / task_metrics['completed']
print(f"Average execution time: {avg_time:.2f}s")
@worker_process_init.connect
def setup_worker_monitoring(sender=None, **kwargs):
"""Setup worker-specific monitoring."""
print(f'Worker process {sender.hostname} initialized with PID {sender.pid}')import logging
from celery.signals import (
setup_logging, after_setup_logger,
task_prerun, task_postrun, task_failure
)
# Custom logger setup
@setup_logging.connect
def setup_custom_logging(sender=None, loglevel=None, logfile=None, **kwargs):
"""Setup custom logging configuration."""
# Configure root logger
logging.basicConfig(
level=loglevel,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(logfile or 'celery.log'),
logging.StreamHandler()
]
)
# Disable default Celery logging setup
return True
@after_setup_logger.connect
def configure_task_logger(sender=None, logger=None, **kwargs):
"""Configure task-specific logger."""
# Add custom handler for task logs
task_handler = logging.FileHandler('tasks.log')
task_handler.setFormatter(logging.Formatter(
'%(asctime)s [%(levelname)s] %(name)s: %(message)s'
))
logger.addHandler(task_handler)
logger.info('Custom task logger configured')
# Task-specific logging
@task_prerun.connect
def log_task_start(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""Log task start with context."""
logger = logging.getLogger(f'tasks.{task.name}')
logger.info(f'Starting task {task_id} with args={args}, kwargs={kwargs}')
@task_postrun.connect
def log_task_end(sender=None, task_id=None, task=None, retval=None, state=None, **kwds):
"""Log task completion."""
logger = logging.getLogger(f'tasks.{task.name}')
logger.info(f'Task {task_id} finished with state={state}')
@task_failure.connect
def log_task_error(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
"""Log task errors with full context."""
logger = logging.getLogger(f'tasks.{sender.name}')
logger.error(f'Task {task_id} failed: {exception}', exc_info=einfo)import json
import requests
from celery.signals import task_success, task_failure, worker_ready
# Webhook notifications
WEBHOOK_URL = 'https://monitoring.example.com/webhook'
@task_success.connect
def notify_task_success(sender=None, result=None, **kwargs):
"""Send webhook notification on task success."""
payload = {
'event': 'task_success',
'task_name': sender.name,
'task_id': kwargs.get('task_id'),
'result': str(result),
'timestamp': datetime.now().isoformat()
}
try:
requests.post(WEBHOOK_URL, json=payload, timeout=5)
except Exception as e:
print(f'Failed to send webhook: {e}')
@task_failure.connect
def notify_task_failure(sender=None, task_id=None, exception=None, **kwargs):
"""Send alert on task failure."""
payload = {
'event': 'task_failure',
'task_name': sender.name,
'task_id': task_id,
'error': str(exception),
'error_type': type(exception).__name__,
'timestamp': datetime.now().isoformat(),
'severity': 'high'
}
try:
requests.post(WEBHOOK_URL, json=payload, timeout=5)
except Exception as e:
print(f'Failed to send failure alert: {e}')
# Metrics collection
@worker_ready.connect
def register_worker(sender=None, **kwargs):
"""Register worker with monitoring system."""
payload = {
'event': 'worker_ready',
'hostname': sender.hostname,
'pid': getattr(sender, 'pid', None),
'timestamp': datetime.now().isoformat()
}
try:
requests.post(f'{WEBHOOK_URL}/workers', json=payload, timeout=5)
except Exception as e:
print(f'Failed to register worker: {e}')from celery.signals import before_task_publish
@before_task_publish.connect
def route_priority_tasks(sender=None, headers=None, body=None, routing_key=None, **kwargs):
"""Dynamically route tasks based on priority."""
task_name = headers.get('task')
priority = body.get('kwargs', {}).get('priority', 'normal')
# Route high priority tasks to dedicated queue
if priority == 'high':
headers['routing_key'] = 'high_priority'
print(f'Routing {task_name} to high priority queue')
# Route long-running tasks to separate workers
elif task_name in ['process_large_file', 'generate_report']:
headers['routing_key'] = 'long_running'
print(f'Routing {task_name} to long running queue')from celery.signals import *
# Debug signal that logs all task activity
@task_prerun.connect
@task_postrun.connect
@task_success.connect
@task_failure.connect
@task_retry.connect
@task_revoked.connect
def debug_task_signals(sender=None, **kwargs):
"""Log all task signals for debugging."""
import inspect
signal_name = inspect.stack()[1].function.replace('_handler', '')
debug_info = {
'signal': signal_name,
'task': getattr(sender, 'name', str(sender)) if sender else None,
'data': {k: str(v) for k, v in kwargs.items() if k not in ['einfo']}
}
print(f'DEBUG SIGNAL: {json.dumps(debug_info, indent=2)}')
# Performance monitoring
task_times = {}
@task_prerun.connect
def start_timer(sender=None, task_id=None, **kwargs):
"""Start timing task execution."""
task_times[task_id] = time.time()
@task_postrun.connect
def end_timer(sender=None, task_id=None, **kwargs):
"""End timing and log duration."""
if task_id in task_times:
duration = time.time() - task_times[task_id]
print(f'Task {sender.name}[{task_id}] took {duration:.3f}s')
del task_times[task_id]from celery.signals import task_failure
import os
# Only handle failures in production
@task_failure.connect
def production_error_handler(sender=None, task_id=None, exception=None, **kwargs):
"""Handle errors differently in production vs development."""
if os.environ.get('ENVIRONMENT') == 'production':
# Send to error tracking service
send_to_sentry(exception, task_id, sender.name)
# Page on-call engineer for critical tasks
if sender.name in ['process_payment', 'send_notification']:
page_oncall_engineer(f'Critical task {sender.name} failed: {exception}')
else:
# Just log in development
print(f'DEV: Task {sender.name}[{task_id}] failed: {exception}')
def send_to_sentry(exception, task_id, task_name):
"""Send error to Sentry."""
# Sentry integration code
pass
def page_oncall_engineer(message):
"""Send alert to on-call engineer."""
# Alerting system integration
passInstall with Tessl CLI
npx tessl i tessl/pypi-celery