CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

queue-monitoring.mddocs/

Queue Monitoring

The queue monitoring capabilities provide sensors and utilities for monitoring Celery queue states, waiting for queues to be empty, and tracking task completion across distributed workers.

Capabilities

CeleryQueueSensor

Sensor that waits for a Celery queue to be empty before proceeding with downstream tasks.

class CeleryQueueSensor(BaseSensorOperator):
    """
    Sensor that waits for a Celery queue to be empty.
    
    By default, considers a queue empty when it has no tasks in the
    'reserved', 'scheduled', or 'active' states.
    """
    
    def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs) -> None:
        """
        Initialize the Celery queue sensor.
        
        Parameters:
        - celery_queue: str, name of the Celery queue to monitor
        - target_task_id: str | None, optional specific task ID to monitor
        - **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
        """
    
    def poke(self, context: Context) -> bool:
        """
        Check if the Celery queue meets the empty condition.
        
        Parameters:
        - context: Context, Airflow task execution context
        
        Returns:
        bool: True if queue is empty (sensor succeeds), False otherwise
        
        Queue States Checked:
        - reserved: Tasks claimed by workers but not yet started
        - scheduled: Tasks scheduled for future execution  
        - active: Tasks currently being executed
        """

Queue State Inspection

Utilities for inspecting Celery queue states and worker status.

def get_queue_stats(queue_name: str) -> dict:
    """
    Get comprehensive statistics for a Celery queue.
    
    Parameters:
    - queue_name: str, name of the queue to inspect
    
    Returns:
    dict containing:
    - 'reserved': int, number of reserved tasks
    - 'scheduled': int, number of scheduled tasks  
    - 'active': int, number of active tasks
    - 'total': int, total tasks in queue
    - 'workers': list, workers consuming from this queue
    """

def get_worker_stats() -> dict:
    """
    Get statistics for all active Celery workers.
    
    Returns:
    dict mapping worker hostnames to their statistics:
    - 'active': int, number of active tasks on worker
    - 'processed': int, total tasks processed by worker
    - 'load': list, recent load averages
    - 'queues': list, queues this worker consumes from
    """

def is_queue_empty(queue_name: str, check_active: bool = True, 
                  check_scheduled: bool = True, check_reserved: bool = True) -> bool:
    """
    Check if a queue is empty based on specified criteria.
    
    Parameters:
    - queue_name: str, queue to check
    - check_active: bool, include active tasks in emptiness check
    - check_scheduled: bool, include scheduled tasks in emptiness check
    - check_reserved: bool, include reserved tasks in emptiness check
    
    Returns:
    bool: True if queue meets emptiness criteria
    """

Usage Examples

Basic Queue Monitoring

from airflow import DAG
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'queue_monitoring_example',
    default_args={
        'start_date': datetime(2024, 1, 1),
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    schedule_interval=timedelta(hours=1),
    catchup=False
)

# Wait for high priority queue to be empty before proceeding
wait_for_high_priority = CeleryQueueSensor(
    task_id='wait_for_high_priority_empty',
    celery_queue='high_priority',
    timeout=600,  # 10 minutes timeout
    poke_interval=30,  # Check every 30 seconds
    dag=dag
)

# Process batch only after high priority tasks complete
process_batch = PythonOperator(
    task_id='process_batch',
    python_callable=lambda: print("Processing batch data..."),
    dag=dag
)

wait_for_high_priority >> process_batch

Advanced Queue Monitoring

from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor

# Monitor specific task completion
wait_for_specific_task = CeleryQueueSensor(
    task_id='wait_for_ml_training',
    celery_queue='ml_training',
    target_task_id='train_model_task_123',  # Wait for specific task
    timeout=3600,  # 1 hour timeout for ML training
    poke_interval=60,  # Check every minute
    dag=dag
)

# Multiple queue coordination
wait_for_multiple_queues = CeleryQueueSensor(
    task_id='wait_for_data_processing',
    celery_queue='data_processing', 
    timeout=1800,  # 30 minutes
    poke_interval=45,
    dag=dag
)

wait_for_cleanup = CeleryQueueSensor(
    task_id='wait_for_cleanup',
    celery_queue='cleanup',
    timeout=300,  # 5 minutes  
    poke_interval=15,
    dag=dag
)

# Proceed only after both queues are empty
final_task = PythonOperator(
    task_id='finalize_processing',
    python_callable=lambda: print("All queues empty, finalizing..."),
    dag=dag
)

[wait_for_multiple_queues, wait_for_cleanup] >> final_task

Custom Queue State Checking

from airflow.operators.python import PythonOperator
from airflow.providers.celery.executors.celery_executor_utils import app

def custom_queue_check(**context):
    """Custom logic for checking queue states."""
    
    # Get Celery inspector
    inspector = app.control.inspect()
    
    # Check active tasks across all workers
    active_tasks = inspector.active()
    if active_tasks:
        total_active = sum(len(tasks) for tasks in active_tasks.values())
        print(f"Total active tasks: {total_active}")
        
        # Check if any tasks are from our target queue
        target_queue_active = any(
            task.get('delivery_info', {}).get('routing_key') == 'target_queue'
            for worker_tasks in active_tasks.values()
            for task in worker_tasks
        )
        
        if target_queue_active:
            print("Target queue has active tasks")
            return False
    
    # Check scheduled tasks
    scheduled_tasks = inspector.scheduled()
    if scheduled_tasks:
        total_scheduled = sum(len(tasks) for tasks in scheduled_tasks.values())
        print(f"Total scheduled tasks: {total_scheduled}")
    
    # Check reserved tasks  
    reserved_tasks = inspector.reserved()
    if reserved_tasks:
        total_reserved = sum(len(tasks) for tasks in reserved_tasks.values())
        print(f"Total reserved tasks: {total_reserved}")
    
    return True  # Queue is considered empty

custom_check = PythonOperator(
    task_id='custom_queue_check',
    python_callable=custom_queue_check,
    dag=dag
)

Queue Health Monitoring

def monitor_queue_health(**context):
    """Monitor overall queue health and worker status."""
    
    inspector = app.control.inspect()
    
    # Check worker availability
    ping_results = inspector.ping()
    active_workers = len(ping_results) if ping_results else 0
    print(f"Active workers: {active_workers}")
    
    if active_workers == 0:
        raise Exception("No active Celery workers found!")
    
    # Check worker statistics
    stats = inspector.stats()
    if stats:
        for worker, worker_stats in stats.items():
            total_tasks = worker_stats.get('total', {})
            print(f"Worker {worker}: {total_tasks}")
            
            # Alert if worker has too many failed tasks
            failed_tasks = total_tasks.get('failed', 0)
            completed_tasks = total_tasks.get('completed', 0)
            
            if completed_tasks > 0 and failed_tasks / completed_tasks > 0.1:
                print(f"WARNING: Worker {worker} has high failure rate: "
                      f"{failed_tasks}/{completed_tasks}")
    
    # Check queue lengths
    active_queues = inspector.active_queues()
    if active_queues:
        for worker, queues in active_queues.items():
            for queue_info in queues:
                queue_name = queue_info.get('name', 'unknown')
                print(f"Worker {worker} consuming from queue: {queue_name}")

health_check = PythonOperator(
    task_id='queue_health_check',
    python_callable=monitor_queue_health,
    dag=dag
)

Integration with Task Dependencies

# Use queue sensors to coordinate complex workflows

# Data pipeline with queue coordination
extract_data = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data_function,
    queue='extraction',  # Route to extraction workers
    dag=dag
)

# Wait for all extraction tasks to complete
wait_extraction_complete = CeleryQueueSensor(
    task_id='wait_extraction_complete',
    celery_queue='extraction',
    timeout=1800,
    poke_interval=60,
    dag=dag
)

# Transform data after extraction is complete
transform_data = PythonOperator(
    task_id='transform_data', 
    python_callable=transform_data_function,
    queue='transformation',
    dag=dag
)

# Wait for transformations
wait_transformation_complete = CeleryQueueSensor(
    task_id='wait_transformation_complete',
    celery_queue='transformation',
    timeout=3600,
    poke_interval=60,
    dag=dag
)

# Load data after transformations complete
load_data = PythonOperator(
    task_id='load_data',
    python_callable=load_data_function,
    queue='loading',
    dag=dag
)

# Pipeline flow with queue coordination
extract_data >> wait_extraction_complete >> transform_data >> wait_transformation_complete >> load_data

Configuration

Sensor behavior can be configured through standard Airflow sensor parameters:

# Common sensor configuration options
timeout: int = 60 * 60 * 24 * 7  # Default: 7 days
poke_interval: int = 60  # Default: 60 seconds  
mode: str = 'poke'  # 'poke' or 'reschedule'
exponential_backoff: bool = False
max_retry_delay: timedelta | None = None
soft_fail: bool = False

Error Handling

from airflow.exceptions import AirflowSensorTimeout, AirflowSkipException

# Handle sensor timeouts
try:
    sensor_result = CeleryQueueSensor(
        task_id='wait_for_queue',
        celery_queue='my_queue',
        timeout=300,
        dag=dag
    ).execute(context)
except AirflowSensorTimeout:
    # Queue didn't empty within timeout
    # Decide whether to fail, skip, or retry
    print("Queue monitoring timed out")
    raise

# Skip downstream tasks if queue check fails
def conditional_queue_check(**context):
    if not is_queue_empty('critical_queue'):
        raise AirflowSkipException("Queue not empty, skipping downstream tasks")
    return True

conditional_sensor = PythonOperator(
    task_id='conditional_queue_check',
    python_callable=conditional_queue_check,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-celery

docs

celery-executor.md

celery-kubernetes-executor.md

cli-commands.md

configuration.md

index.md

queue-monitoring.md

tile.json