Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Celery provider includes comprehensive command-line tools for managing Celery workers, monitoring with Flower, and performing queue operations. These commands extend Airflow's CLI with Celery-specific functionality.
Commands for starting, stopping, and managing Celery workers.
def worker(args):
"""
Start a Celery worker process.
Usage: airflow celery worker [options]
Parameters (via args object):
- concurrency: int, number of concurrent worker processes
- hostname: str, worker hostname identifier
- queues: list[str], specific queues for this worker to consume
- loglevel: str, logging level (DEBUG, INFO, WARNING, ERROR)
- autoscale: str, autoscaling configuration (max,min)
- without_gossip: bool, disable gossip for worker discovery
- without_mingle: bool, disable startup synchronization
- without_heartbeat: bool, disable worker heartbeat
- daemon: bool, run as daemon process
- pidfile: str, path to PID file for daemon mode
- logfile: str, path to log file
Environment Setup:
- Configures worker process name and logging
- Sets up signal handlers for graceful shutdown
- Initializes Celery app with Airflow configuration
"""
def stop_worker(args):
"""
Stop a running Celery worker by PID (deprecated - use shutdown instead).
Usage: airflow celery stop [options]
Parameters (via args object):
- pid: str, path to PID file of worker to stop
- verbose: bool, enable verbose output
Reads PID from file and sends termination signal to worker process.
"""
def list_workers(args):
"""
List all active Celery workers and their status.
Usage: airflow celery list-workers [options]
Parameters (via args object):
- output: str, output format ('table', 'json', 'yaml', 'plain')
Displays:
- Worker hostnames and online status
- Active task counts per worker
- Queues each worker is consuming from
- Load averages and resource usage
"""
def shutdown_worker(args):
"""
Gracefully shutdown a specific Celery worker.
Usage: airflow celery shutdown-worker [options]
Parameters (via args object):
- celery_hostname: str, worker hostname to shutdown (required)
Sends shutdown command and waits for worker to finish current tasks.
"""
def shutdown_all_workers(args):
"""
Shutdown all active Celery workers.
Usage: airflow celery shutdown-all-workers [options]
Parameters (via args object):
- yes: bool, skip confirmation prompt
Sends shutdown commands to all discovered workers and monitors
their termination status.
"""Commands for managing Celery task queues and worker queue assignments.
def add_queue(args):
"""
Add a queue to worker's consumption list.
Usage: airflow celery add-queue [options]
Parameters (via args object):
- celery_hostname: str, target worker hostname (required)
- queues: list[str], queues to add to worker (required)
Dynamically adds queue to running worker without restart.
Worker will begin consuming tasks from the new queue.
"""
def remove_queue(args):
"""
Remove a queue from worker's consumption list.
Usage: airflow celery remove-queue [options]
Parameters (via args object):
- celery_hostname: str, target worker hostname (required)
- queues: list[str], queues to remove from worker (required)
Stops worker from consuming new tasks from specified queue.
Currently processing tasks from that queue will complete normally.
"""Commands for monitoring and administration of Celery clusters.
def flower(args):
"""
Start Flower web-based monitoring and administration tool.
Usage: airflow celery flower [options]
Parameters (via args object):
- hostname: str, interface to bind (default: 0.0.0.0)
- port: int, port number (default: 5555)
- broker_api: str, broker API URL for advanced monitoring
- url_prefix: str, URL prefix for reverse proxy setups
- basic_auth: str, HTTP basic authentication (user:pass,user2:pass2)
- flower_conf: str, path to Flower configuration file
- daemon: bool, run as daemon process
- pidfile: str, PID file path for daemon mode
- logfile: str, log file path
Features:
- Real-time task monitoring and statistics
- Worker management and control
- Queue monitoring and manipulation
- Task routing and execution control
- Historical task execution data
"""Supporting functions used by CLI commands.
def _check_if_active_celery_worker(hostname: str) -> bool:
"""
Check if a Celery worker with given hostname is active.
Parameters:
- hostname: str, worker hostname to check
Returns:
bool: True if worker is active and responding
"""
def _serve_logs(skip_serve_logs: bool = False) -> None:
"""
Start log serving process for worker logs.
Parameters:
- skip_serve_logs: bool, whether to skip log serving setup
Sets up log file serving for accessing worker logs via web interface.
"""
def logger_setup_handler(logger, **kwargs) -> None:
"""
Configure logging for Celery worker processes.
Parameters:
- logger: Logger instance to configure
- **kwargs: Additional logging configuration parameters
Sets up appropriate log formatting, handlers, and levels for
Celery worker processes.
"""# Start basic worker with default settings
airflow celery worker
# Start worker with specific concurrency
airflow celery worker --concurrency 8
# Start worker consuming from specific queues
airflow celery worker --queues high_priority,default
# Start worker with autoscaling
airflow celery worker --autoscale 16,4
# Start worker as daemon
airflow celery worker --daemon --pidfile /var/run/celery-worker.pid --logfile /var/log/celery-worker.log
# Start worker with custom hostname
airflow celery worker --hostname worker-gpu-01
# Start worker with debug logging
airflow celery worker --loglevel DEBUG# List all active workers
airflow celery list
# Stop specific worker
airflow celery stop worker-01@hostname
# Gracefully shutdown worker
airflow celery shutdown worker-01@hostname
# Shutdown all workers
airflow celery shutdown_all# Add queue to running worker
airflow celery add_queue worker-01@hostname ml_training
# Remove queue from worker
airflow celery remove_queue worker-01@hostname old_queue
# Check queue status (using Flower or custom scripts)
# airflow celery flower --port 5555
# Then access http://localhost:5555 for queue monitoring# Start Flower with default settings
airflow celery flower
# Start Flower on custom port
airflow celery flower --port 8080
# Start Flower with authentication
airflow celery flower --basic_auth admin:secret,user:password
# Start Flower with URL prefix (for reverse proxy)
airflow celery flower --url_prefix /flower
# Start Flower as daemon
airflow celery flower --daemon --pidfile /var/run/flower.pid --logfile /var/log/flower.log
# Start Flower with broker API access
airflow celery flower --broker_api http://guest@localhost:15672/api/#!/bin/bash
# Production worker startup script
# Set environment variables
export AIRFLOW_HOME=/opt/airflow
export PYTHONPATH=/opt/airflow:$PYTHONPATH
# Start worker with production settings
airflow celery worker \
--concurrency 16 \
--queues default,high_priority,ml_training \
--hostname $(hostname)-worker \
--loglevel INFO \
--daemon \
--pidfile /var/run/airflow-worker.pid \
--logfile /var/log/airflow-worker.log
# Start Flower monitoring
airflow celery flower \
--port 5555 \
--basic_auth admin:$(cat /etc/flower-password) \
--daemon \
--pidfile /var/run/flower.pid \
--logfile /var/log/flower.log
echo "Celery worker and Flower started"#!/bin/bash
# Worker health check script
# Check if worker process is running
if ! pgrep -f "airflow celery worker" > /dev/null; then
echo "ERROR: Celery worker not running"
exit 1
fi
# Check if worker is responding
WORKER_HOSTNAME=$(hostname)-worker
if ! airflow celery list | grep -q "$WORKER_HOSTNAME"; then
echo "ERROR: Worker $WORKER_HOSTNAME not responding"
exit 1
fi
echo "Worker health check passed"
exit 0#!/usr/bin/env python3
"""Dynamic queue management script."""
import subprocess
import json
from airflow.providers.celery.executors.celery_executor_utils import app
def scale_workers_for_queue(queue_name: str, target_workers: int):
"""Add or remove workers for a specific queue based on demand."""
# Get current workers for this queue
inspector = app.control.inspect()
active_queues = inspector.active_queues()
current_workers = []
if active_queues:
for worker, queues in active_queues.items():
if any(q.get('name') == queue_name for q in queues):
current_workers.append(worker)
current_count = len(current_workers)
if current_count < target_workers:
# Need to add workers
needed = target_workers - current_count
for i in range(needed):
subprocess.run([
'airflow', 'celery', 'worker',
'--queues', queue_name,
'--hostname', f'{queue_name}-worker-{i}',
'--daemon'
])
print(f"Started worker {queue_name}-worker-{i}")
elif current_count > target_workers:
# Need to remove workers
excess = current_count - target_workers
for i in range(excess):
worker_to_stop = current_workers[i]
subprocess.run([
'airflow', 'celery', 'shutdown', worker_to_stop
])
print(f"Stopped worker {worker_to_stop}")
if __name__ == "__main__":
# Example: Scale ml_training queue to 4 workers
scale_workers_for_queue('ml_training', 4)# Dockerfile for Celery worker
FROM apache/airflow:2.8.0
# Install additional dependencies
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt
# Copy DAGs and configuration
COPY dags/ /opt/airflow/dags/
COPY config/ /opt/airflow/config/
# Entrypoint script
COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh
ENTRYPOINT ["/docker-entrypoint.sh"]#!/bin/bash
# docker-entrypoint.sh
# Initialize Airflow database (if needed)
if [ "$1" = "worker" ]; then
# Start Celery worker
exec airflow celery worker \
--concurrency ${WORKER_CONCURRENCY:-16} \
--queues ${WORKER_QUEUES:-default} \
--hostname ${HOSTNAME}-worker
elif [ "$1" = "flower" ]; then
# Start Flower monitoring
exec airflow celery flower \
--port ${FLOWER_PORT:-5555}
else
# Pass through other commands
exec "$@"
fi# docker-compose.yml
version: '3.8'
services:
worker:
build: .
command: worker
environment:
- WORKER_CONCURRENCY=8
- WORKER_QUEUES=default,high_priority
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
depends_on:
- redis
- postgres
flower:
build: .
command: flower
ports:
- "5555:5555"
environment:
- FLOWER_PORT=5555
depends_on:
- redisCLI commands respect Airflow configuration and environment variables:
# Key configuration sections used by CLI commands:
# [celery] section
WORKER_CONCURRENCY = 16
FLOWER_HOST = "0.0.0.0"
FLOWER_PORT = 5555
FLOWER_BASIC_AUTH = "" # user:pass,user2:pass2
WORKER_AUTOSCALE = "" # max,min
WORKER_PREFETCH_MULTIPLIER = 1
# [logging] section
LOGGING_LEVEL = "INFO"
BASE_LOG_FOLDER = "/opt/airflow/logs"
# Environment variables
AIRFLOW_HOME = "/opt/airflow"
PYTHONPATH = "/opt/airflow"Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-celery