CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-celery

Distributed Task Queue for Python that enables asynchronous task execution across multiple workers and machines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

signals-events.mddocs/

Signals and Events

Signal-based event system for monitoring task lifecycle, worker status, and system events. These hooks enable custom logging, monitoring, debugging, and integration with external systems throughout Celery's execution pipeline.

Capabilities

Task Lifecycle Signals

Signals fired during task execution phases, providing hooks for monitoring, logging, and custom behavior at each stage.

# Task execution signals
before_task_publish = Signal()
after_task_publish = Signal()
task_prerun = Signal()
task_postrun = Signal()
task_success = Signal() 
task_failure = Signal()
task_retry = Signal()
task_revoked = Signal()
task_received = Signal()
task_rejected = Signal()
task_unknown = Signal()

class Signal:
    def connect(self, callback, sender=None, weak=True, dispatch_uid=None):
        """
        Connect callback to signal.
        
        Args:
            callback (callable): Function to call when signal fires
            sender: Signal sender filter (None for all senders)
            weak (bool): Use weak references to callback
            dispatch_uid: Unique identifier for this connection
        """

    def disconnect(self, callback=None, sender=None, dispatch_uid=None):
        """
        Disconnect callback from signal.
        
        Args:
            callback (callable): Callback to disconnect
            sender: Sender filter
            dispatch_uid: Connection identifier
            
        Returns:
            bool: True if disconnected
        """

    def send(self, sender, **kwargs):
        """
        Send signal to all connected callbacks.
        
        Args:
            sender: Signal sender
            **kwargs: Signal data
            
        Returns:
            list: [(receiver, response), ...] for each callback
        """

def before_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, declare=None, retry_policy=None, **kwargs):
    """
    Signal fired before task is published to broker.
    
    Args:
        sender: Publisher instance
        headers (dict): Message headers
        body (dict): Message body  
        routing_key (str): Message routing key
        exchange (str): Exchange name
        declare (list): Exchanges/queues to declare
        retry_policy (dict): Retry configuration
    """

def after_task_publish(sender=None, headers=None, body=None, routing_key=None, exchange=None, **kwargs):
    """
    Signal fired after task is published to broker.
    
    Args:
        sender: Publisher instance
        headers (dict): Message headers
        body (dict): Message body
        routing_key (str): Message routing key  
        exchange (str): Exchange name
    """

def task_prerun(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
    """
    Signal fired before task execution.
    
    Args:
        sender: Task class
        task_id (str): Task ID
        task: Task instance
        args (tuple): Task arguments
        kwargs (dict): Task keyword arguments
    """

def task_postrun(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    """
    Signal fired after task execution.
    
    Args:
        sender: Task class
        task_id (str): Task ID
        task: Task instance
        args (tuple): Task arguments
        kwargs (dict): Task keyword arguments
        retval: Task return value
        state (str): Final task state
    """

def task_success(sender=None, result=None, **kwds):
    """
    Signal fired when task succeeds.
    
    Args:
        sender: Task class
        result: Task return value
    """

def task_failure(sender=None, task_id=None, exception=None, einfo=None, **kwds):
    """
    Signal fired when task fails.
    
    Args:
        sender: Task class  
        task_id (str): Task ID
        exception: Exception instance
        einfo: Exception info object
    """

def task_retry(sender=None, task_id=None, reason=None, einfo=None, **kwds):
    """
    Signal fired when task is retried.
    
    Args:
        sender: Task class
        task_id (str): Task ID
        reason: Retry reason/exception
        einfo: Exception info
    """

def task_revoked(sender=None, request=None, terminated=None, signum=None, expired=None, **kwds):
    """
    Signal fired when task is revoked.
    
    Args:
        sender: Task class
        request: Task request object
        terminated (bool): Process was terminated
        signum (int): Signal number if terminated
        expired (bool): Task expired
    """

def task_received(sender=None, request=None, **kwargs):
    """
    Signal fired when worker receives task.
    
    Args:
        sender: Consumer instance
        request: Task request object
    """

def task_rejected(sender=None, message=None, exc=None, **kwargs):
    """
    Signal fired when task is rejected.
    
    Args:
        sender: Consumer instance  
        message: Rejected message
        exc: Rejection exception
    """

def task_unknown(sender=None, message=None, exc=None, name=None, id=None, **kwargs):
    """
    Signal fired when unknown task received.
    
    Args:
        sender: Consumer instance
        message: Unknown message
        exc: Exception raised
        name (str): Unknown task name
        id (str): Task ID
    """

Worker Lifecycle Signals

Signals for monitoring worker startup, shutdown, and process management events.

# Worker lifecycle signals
worker_init = Signal()
worker_process_init = Signal()
worker_process_shutdown = Signal()
worker_ready = Signal()
worker_shutdown = Signal()
worker_shutting_down = Signal()
celeryd_init = Signal()
celeryd_after_setup = Signal()

def worker_init(sender=None, **kwargs):
    """
    Signal fired when worker initializes.
    
    Args:
        sender: Worker instance
    """

def worker_process_init(sender=None, **kwargs):
    """
    Signal fired when worker process initializes.
    
    Args:
        sender: Worker instance
    """

def worker_process_shutdown(sender=None, pid=None, exitcode=None, **kwargs):
    """
    Signal fired when worker process shuts down.
    
    Args:
        sender: Worker instance
        pid (int): Process ID
        exitcode (int): Process exit code
    """

def worker_ready(sender=None, **kwargs):
    """
    Signal fired when worker is ready to receive tasks.
    
    Args:
        sender: Worker instance
    """

def worker_shutdown(sender=None, **kwargs):
    """
    Signal fired when worker shuts down.
    
    Args:
        sender: Worker instance
    """

def worker_shutting_down(sender=None, **kwargs):
    """
    Signal fired when worker begins shutdown.
    
    Args:
        sender: Worker instance
    """

def celeryd_init(sender=None, instance=None, conf=None, **kwargs):
    """
    Signal fired when celery daemon initializes.
    
    Args:
        sender: Worker class
        instance: Worker instance
        conf: Configuration object
    """

def celeryd_after_setup(sender=None, instance=None, conf=None, **kwargs):
    """
    Signal fired after celery daemon setup.
    
    Args:
        sender: Worker class
        instance: Worker instance  
        conf: Configuration object
    """

Beat Scheduler Signals

Signals for monitoring periodic task scheduler events and beat service lifecycle.

# Beat scheduler signals  
beat_init = Signal()
beat_embedded_init = Signal()

def beat_init(sender=None, **kwargs):
    """
    Signal fired when beat scheduler initializes.
    
    Args:
        sender: Beat service instance
    """

def beat_embedded_init(sender=None, **kwargs):
    """
    Signal fired when embedded beat initializes.
    
    Args:
        sender: Beat service instance  
    """

Logging and Setup Signals

Signals for customizing logging configuration and system setup procedures.

# Logging signals
setup_logging = Signal()
after_setup_logger = Signal()  
after_setup_task_logger = Signal()

def setup_logging(sender=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
    """
    Signal fired during logging setup.
    
    Args:
        sender: Logging setup caller
        loglevel (int): Log level
        logfile (str): Log file path
        format (str): Log format string
        colorize (bool): Enable colored output
    """

def after_setup_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
    """
    Signal fired after logger setup.
    
    Args:
        sender: Logger setup caller
        logger: Logger instance
        loglevel (int): Log level
        logfile (str): Log file path  
        format (str): Log format
        colorize (bool): Colored output enabled
    """

def after_setup_task_logger(sender=None, logger=None, loglevel=None, logfile=None, format=None, colorize=None, **kwargs):
    """
    Signal fired after task logger setup.
    
    Args:
        sender: Task logger setup caller
        logger: Task logger instance
        loglevel (int): Log level
        logfile (str): Log file path
        format (str): Log format  
        colorize (bool): Colored output enabled
    """

Signal Connection Decorators

Convenience decorators for connecting signal handlers to specific events.

def receiver(signal, **kwargs):
    """
    Decorator for connecting signal handlers.
    
    Args:
        signal: Signal instance to connect to
        **kwargs: Connection options (sender, dispatch_uid, etc.)
        
    Returns:
        Decorator function
    """

# Usage patterns
@receiver(task_success)
def task_success_handler(sender=None, result=None, **kwargs):
    """Handle task success."""
    pass

@receiver(task_failure)  
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
    """Handle task failure."""
    pass

Usage Examples

Basic Signal Handlers

from celery import Celery
from celery.signals import (
    task_prerun, task_postrun, task_success, task_failure,
    worker_ready, worker_shutdown
)

app = Celery('signal_example')

@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
    """Log task start."""
    print(f'Task {task.name}[{task_id}] starting with args={args}, kwargs={kwargs}')

@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    """Log task completion."""
    print(f'Task {task.name}[{task_id}] finished with state={state}, result={retval}')

@task_success.connect
def task_success_handler(sender=None, result=None, **kwargs):
    """Handle successful task completion."""
    print(f'Task {sender.name} succeeded with result: {result}')

@task_failure.connect
def task_failure_handler(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
    """Handle task failure."""
    print(f'Task {sender.name}[{task_id}] failed: {exception}')
    print(f'Traceback: {einfo.traceback}')

@worker_ready.connect
def worker_ready_handler(sender=None, **kwargs):
    """Log when worker becomes ready."""
    print(f'Worker {sender.hostname} is ready to receive tasks')

@worker_shutdown.connect 
def worker_shutdown_handler(sender=None, **kwargs):
    """Log worker shutdown."""
    print(f'Worker {sender.hostname} is shutting down')

Advanced Monitoring and Metrics

import time
import json
from datetime import datetime
from celery.signals import (
    before_task_publish, after_task_publish,
    task_prerun, task_postrun, task_success, task_failure,
    worker_process_init
)

# Task execution metrics
task_metrics = {
    'published': 0,
    'started': 0, 
    'completed': 0,
    'failed': 0,
    'total_execution_time': 0
}

@before_task_publish.connect
def track_task_published(sender=None, headers=None, body=None, **kwargs):
    """Track task publication."""
    task_metrics['published'] += 1
    task_name = headers.get('task', 'unknown')
    print(f'Publishing task: {task_name}')

@task_prerun.connect  
def track_task_start(sender=None, task_id=None, task=None, **kwargs):
    """Track task execution start."""
    task_metrics['started'] += 1
    
    # Store start time for duration calculation
    task.request.start_time = time.time()
    
    print(f'Starting task {task.name}[{task_id}] at {datetime.now()}')

@task_postrun.connect
def track_task_completion(sender=None, task_id=None, task=None, state=None, **kwargs):
    """Track task completion and calculate duration."""
    task_metrics['completed'] += 1
    
    # Calculate execution time
    if hasattr(task.request, 'start_time'):
        duration = time.time() - task.request.start_time
        task_metrics['total_execution_time'] += duration
        print(f'Task {task.name}[{task_id}] completed in {duration:.2f}s with state {state}')

@task_failure.connect
def track_task_failure(sender=None, task_id=None, exception=None, **kwargs):
    """Track task failures."""
    task_metrics['failed'] += 1
    print(f'Task {sender.name}[{task_id}] failed: {type(exception).__name__}: {exception}')

def print_metrics():
    """Print current metrics."""
    print("\n=== Task Metrics ===")
    print(f"Published: {task_metrics['published']}")
    print(f"Started: {task_metrics['started']}")  
    print(f"Completed: {task_metrics['completed']}")
    print(f"Failed: {task_metrics['failed']}")
    if task_metrics['completed'] > 0:
        avg_time = task_metrics['total_execution_time'] / task_metrics['completed']
        print(f"Average execution time: {avg_time:.2f}s")

@worker_process_init.connect
def setup_worker_monitoring(sender=None, **kwargs):
    """Setup worker-specific monitoring."""
    print(f'Worker process {sender.hostname} initialized with PID {sender.pid}')

Custom Logging Integration

import logging
from celery.signals import (
    setup_logging, after_setup_logger, 
    task_prerun, task_postrun, task_failure
)

# Custom logger setup
@setup_logging.connect  
def setup_custom_logging(sender=None, loglevel=None, logfile=None, **kwargs):
    """Setup custom logging configuration."""
    
    # Configure root logger
    logging.basicConfig(
        level=loglevel,
        format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(logfile or 'celery.log'),
            logging.StreamHandler()
        ]
    )
    
    # Disable default Celery logging setup
    return True

@after_setup_logger.connect
def configure_task_logger(sender=None, logger=None, **kwargs):
    """Configure task-specific logger."""
    
    # Add custom handler for task logs
    task_handler = logging.FileHandler('tasks.log')
    task_handler.setFormatter(logging.Formatter(
        '%(asctime)s [%(levelname)s] %(name)s: %(message)s'
    ))
    
    logger.addHandler(task_handler)
    logger.info('Custom task logger configured')

# Task-specific logging  
@task_prerun.connect
def log_task_start(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
    """Log task start with context."""
    logger = logging.getLogger(f'tasks.{task.name}')
    logger.info(f'Starting task {task_id} with args={args}, kwargs={kwargs}')

@task_postrun.connect  
def log_task_end(sender=None, task_id=None, task=None, retval=None, state=None, **kwds):
    """Log task completion."""
    logger = logging.getLogger(f'tasks.{task.name}')
    logger.info(f'Task {task_id} finished with state={state}')

@task_failure.connect
def log_task_error(sender=None, task_id=None, exception=None, einfo=None, **kwargs):
    """Log task errors with full context."""
    logger = logging.getLogger(f'tasks.{sender.name}')
    logger.error(f'Task {task_id} failed: {exception}', exc_info=einfo)

External System Integration

import json
import requests
from celery.signals import task_success, task_failure, worker_ready

# Webhook notifications
WEBHOOK_URL = 'https://monitoring.example.com/webhook'

@task_success.connect
def notify_task_success(sender=None, result=None, **kwargs):
    """Send webhook notification on task success."""
    
    payload = {
        'event': 'task_success',
        'task_name': sender.name,
        'task_id': kwargs.get('task_id'),
        'result': str(result),
        'timestamp': datetime.now().isoformat()
    }
    
    try:
        requests.post(WEBHOOK_URL, json=payload, timeout=5)
    except Exception as e:
        print(f'Failed to send webhook: {e}')

@task_failure.connect
def notify_task_failure(sender=None, task_id=None, exception=None, **kwargs):
    """Send alert on task failure."""
    
    payload = {
        'event': 'task_failure',
        'task_name': sender.name,
        'task_id': task_id,
        'error': str(exception),
        'error_type': type(exception).__name__,
        'timestamp': datetime.now().isoformat(),
        'severity': 'high'
    }
    
    try:
        requests.post(WEBHOOK_URL, json=payload, timeout=5)
    except Exception as e:
        print(f'Failed to send failure alert: {e}')

# Metrics collection
@worker_ready.connect
def register_worker(sender=None, **kwargs):
    """Register worker with monitoring system."""
    
    payload = {
        'event': 'worker_ready',
        'hostname': sender.hostname,
        'pid': getattr(sender, 'pid', None),
        'timestamp': datetime.now().isoformat()
    }
    
    try:
        requests.post(f'{WEBHOOK_URL}/workers', json=payload, timeout=5)
    except Exception as e:
        print(f'Failed to register worker: {e}')

Signal-Based Task Routing

from celery.signals import before_task_publish

@before_task_publish.connect
def route_priority_tasks(sender=None, headers=None, body=None, routing_key=None, **kwargs):
    """Dynamically route tasks based on priority."""
    
    task_name = headers.get('task')
    priority = body.get('kwargs', {}).get('priority', 'normal')
    
    # Route high priority tasks to dedicated queue
    if priority == 'high':
        headers['routing_key'] = 'high_priority'
        print(f'Routing {task_name} to high priority queue')
    
    # Route long-running tasks to separate workers
    elif task_name in ['process_large_file', 'generate_report']:
        headers['routing_key'] = 'long_running'
        print(f'Routing {task_name} to long running queue')

Debugging and Development

from celery.signals import *

# Debug signal that logs all task activity
@task_prerun.connect
@task_postrun.connect  
@task_success.connect
@task_failure.connect
@task_retry.connect
@task_revoked.connect
def debug_task_signals(sender=None, **kwargs):
    """Log all task signals for debugging."""
    
    import inspect
    signal_name = inspect.stack()[1].function.replace('_handler', '')
    
    debug_info = {
        'signal': signal_name,
        'task': getattr(sender, 'name', str(sender)) if sender else None,
        'data': {k: str(v) for k, v in kwargs.items() if k not in ['einfo']}
    }
    
    print(f'DEBUG SIGNAL: {json.dumps(debug_info, indent=2)}')

# Performance monitoring
task_times = {}

@task_prerun.connect
def start_timer(sender=None, task_id=None, **kwargs):
    """Start timing task execution."""
    task_times[task_id] = time.time()

@task_postrun.connect
def end_timer(sender=None, task_id=None, **kwargs):
    """End timing and log duration."""
    if task_id in task_times:
        duration = time.time() - task_times[task_id]
        print(f'Task {sender.name}[{task_id}] took {duration:.3f}s')
        del task_times[task_id]

Conditional Signal Handlers

from celery.signals import task_failure
import os

# Only handle failures in production
@task_failure.connect
def production_error_handler(sender=None, task_id=None, exception=None, **kwargs):
    """Handle errors differently in production vs development."""
    
    if os.environ.get('ENVIRONMENT') == 'production':
        # Send to error tracking service
        send_to_sentry(exception, task_id, sender.name)
        
        # Page on-call engineer for critical tasks
        if sender.name in ['process_payment', 'send_notification']:
            page_oncall_engineer(f'Critical task {sender.name} failed: {exception}')
    else:
        # Just log in development
        print(f'DEV: Task {sender.name}[{task_id}] failed: {exception}')

def send_to_sentry(exception, task_id, task_name):
    """Send error to Sentry.""" 
    # Sentry integration code
    pass

def page_oncall_engineer(message):
    """Send alert to on-call engineer."""
    # Alerting system integration
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-celery

docs

configuration.md

core-application.md

exceptions.md

index.md

results-state.md

scheduling-beat.md

signals-events.md

workflow-primitives.md

tile.json