Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The queue monitoring capabilities provide sensors and utilities for monitoring Celery queue states, waiting for queues to be empty, and tracking task completion across distributed workers.
Sensor that waits for a Celery queue to be empty before proceeding with downstream tasks.
class CeleryQueueSensor(BaseSensorOperator):
"""
Sensor that waits for a Celery queue to be empty.
By default, considers a queue empty when it has no tasks in the
'reserved', 'scheduled', or 'active' states.
"""
def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs) -> None:
"""
Initialize the Celery queue sensor.
Parameters:
- celery_queue: str, name of the Celery queue to monitor
- target_task_id: str | None, optional specific task ID to monitor
- **kwargs: Additional sensor parameters (timeout, poke_interval, etc.)
"""
def poke(self, context: Context) -> bool:
"""
Check if the Celery queue meets the empty condition.
Parameters:
- context: Context, Airflow task execution context
Returns:
bool: True if queue is empty (sensor succeeds), False otherwise
Queue States Checked:
- reserved: Tasks claimed by workers but not yet started
- scheduled: Tasks scheduled for future execution
- active: Tasks currently being executed
"""Utilities for inspecting Celery queue states and worker status.
def get_queue_stats(queue_name: str) -> dict:
"""
Get comprehensive statistics for a Celery queue.
Parameters:
- queue_name: str, name of the queue to inspect
Returns:
dict containing:
- 'reserved': int, number of reserved tasks
- 'scheduled': int, number of scheduled tasks
- 'active': int, number of active tasks
- 'total': int, total tasks in queue
- 'workers': list, workers consuming from this queue
"""
def get_worker_stats() -> dict:
"""
Get statistics for all active Celery workers.
Returns:
dict mapping worker hostnames to their statistics:
- 'active': int, number of active tasks on worker
- 'processed': int, total tasks processed by worker
- 'load': list, recent load averages
- 'queues': list, queues this worker consumes from
"""
def is_queue_empty(queue_name: str, check_active: bool = True,
check_scheduled: bool = True, check_reserved: bool = True) -> bool:
"""
Check if a queue is empty based on specified criteria.
Parameters:
- queue_name: str, queue to check
- check_active: bool, include active tasks in emptiness check
- check_scheduled: bool, include scheduled tasks in emptiness check
- check_reserved: bool, include reserved tasks in emptiness check
Returns:
bool: True if queue meets emptiness criteria
"""from airflow import DAG
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'queue_monitoring_example',
default_args={
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
schedule_interval=timedelta(hours=1),
catchup=False
)
# Wait for high priority queue to be empty before proceeding
wait_for_high_priority = CeleryQueueSensor(
task_id='wait_for_high_priority_empty',
celery_queue='high_priority',
timeout=600, # 10 minutes timeout
poke_interval=30, # Check every 30 seconds
dag=dag
)
# Process batch only after high priority tasks complete
process_batch = PythonOperator(
task_id='process_batch',
python_callable=lambda: print("Processing batch data..."),
dag=dag
)
wait_for_high_priority >> process_batchfrom airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
# Monitor specific task completion
wait_for_specific_task = CeleryQueueSensor(
task_id='wait_for_ml_training',
celery_queue='ml_training',
target_task_id='train_model_task_123', # Wait for specific task
timeout=3600, # 1 hour timeout for ML training
poke_interval=60, # Check every minute
dag=dag
)
# Multiple queue coordination
wait_for_multiple_queues = CeleryQueueSensor(
task_id='wait_for_data_processing',
celery_queue='data_processing',
timeout=1800, # 30 minutes
poke_interval=45,
dag=dag
)
wait_for_cleanup = CeleryQueueSensor(
task_id='wait_for_cleanup',
celery_queue='cleanup',
timeout=300, # 5 minutes
poke_interval=15,
dag=dag
)
# Proceed only after both queues are empty
final_task = PythonOperator(
task_id='finalize_processing',
python_callable=lambda: print("All queues empty, finalizing..."),
dag=dag
)
[wait_for_multiple_queues, wait_for_cleanup] >> final_taskfrom airflow.operators.python import PythonOperator
from airflow.providers.celery.executors.celery_executor_utils import app
def custom_queue_check(**context):
"""Custom logic for checking queue states."""
# Get Celery inspector
inspector = app.control.inspect()
# Check active tasks across all workers
active_tasks = inspector.active()
if active_tasks:
total_active = sum(len(tasks) for tasks in active_tasks.values())
print(f"Total active tasks: {total_active}")
# Check if any tasks are from our target queue
target_queue_active = any(
task.get('delivery_info', {}).get('routing_key') == 'target_queue'
for worker_tasks in active_tasks.values()
for task in worker_tasks
)
if target_queue_active:
print("Target queue has active tasks")
return False
# Check scheduled tasks
scheduled_tasks = inspector.scheduled()
if scheduled_tasks:
total_scheduled = sum(len(tasks) for tasks in scheduled_tasks.values())
print(f"Total scheduled tasks: {total_scheduled}")
# Check reserved tasks
reserved_tasks = inspector.reserved()
if reserved_tasks:
total_reserved = sum(len(tasks) for tasks in reserved_tasks.values())
print(f"Total reserved tasks: {total_reserved}")
return True # Queue is considered empty
custom_check = PythonOperator(
task_id='custom_queue_check',
python_callable=custom_queue_check,
dag=dag
)def monitor_queue_health(**context):
"""Monitor overall queue health and worker status."""
inspector = app.control.inspect()
# Check worker availability
ping_results = inspector.ping()
active_workers = len(ping_results) if ping_results else 0
print(f"Active workers: {active_workers}")
if active_workers == 0:
raise Exception("No active Celery workers found!")
# Check worker statistics
stats = inspector.stats()
if stats:
for worker, worker_stats in stats.items():
total_tasks = worker_stats.get('total', {})
print(f"Worker {worker}: {total_tasks}")
# Alert if worker has too many failed tasks
failed_tasks = total_tasks.get('failed', 0)
completed_tasks = total_tasks.get('completed', 0)
if completed_tasks > 0 and failed_tasks / completed_tasks > 0.1:
print(f"WARNING: Worker {worker} has high failure rate: "
f"{failed_tasks}/{completed_tasks}")
# Check queue lengths
active_queues = inspector.active_queues()
if active_queues:
for worker, queues in active_queues.items():
for queue_info in queues:
queue_name = queue_info.get('name', 'unknown')
print(f"Worker {worker} consuming from queue: {queue_name}")
health_check = PythonOperator(
task_id='queue_health_check',
python_callable=monitor_queue_health,
dag=dag
)# Use queue sensors to coordinate complex workflows
# Data pipeline with queue coordination
extract_data = PythonOperator(
task_id='extract_data',
python_callable=extract_data_function,
queue='extraction', # Route to extraction workers
dag=dag
)
# Wait for all extraction tasks to complete
wait_extraction_complete = CeleryQueueSensor(
task_id='wait_extraction_complete',
celery_queue='extraction',
timeout=1800,
poke_interval=60,
dag=dag
)
# Transform data after extraction is complete
transform_data = PythonOperator(
task_id='transform_data',
python_callable=transform_data_function,
queue='transformation',
dag=dag
)
# Wait for transformations
wait_transformation_complete = CeleryQueueSensor(
task_id='wait_transformation_complete',
celery_queue='transformation',
timeout=3600,
poke_interval=60,
dag=dag
)
# Load data after transformations complete
load_data = PythonOperator(
task_id='load_data',
python_callable=load_data_function,
queue='loading',
dag=dag
)
# Pipeline flow with queue coordination
extract_data >> wait_extraction_complete >> transform_data >> wait_transformation_complete >> load_dataSensor behavior can be configured through standard Airflow sensor parameters:
# Common sensor configuration options
timeout: int = 60 * 60 * 24 * 7 # Default: 7 days
poke_interval: int = 60 # Default: 60 seconds
mode: str = 'poke' # 'poke' or 'reschedule'
exponential_backoff: bool = False
max_retry_delay: timedelta | None = None
soft_fail: bool = Falsefrom airflow.exceptions import AirflowSensorTimeout, AirflowSkipException
# Handle sensor timeouts
try:
sensor_result = CeleryQueueSensor(
task_id='wait_for_queue',
celery_queue='my_queue',
timeout=300,
dag=dag
).execute(context)
except AirflowSensorTimeout:
# Queue didn't empty within timeout
# Decide whether to fail, skip, or retry
print("Queue monitoring timed out")
raise
# Skip downstream tasks if queue check fails
def conditional_queue_check(**context):
if not is_queue_empty('critical_queue'):
raise AirflowSkipException("Queue not empty, skipping downstream tasks")
return True
conditional_sensor = PythonOperator(
task_id='conditional_queue_check',
python_callable=conditional_queue_check,
dag=dag
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-celery