Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The Celery provider offers extensive configuration options to customize broker connections, worker behavior, monitoring, security, and performance tuning. Configuration is managed through Airflow's standard configuration system.
Primary configuration section for Celery executor behavior.
# Configuration section: [celery]
CELERY_APP_NAME = "airflow.providers.celery.executors.celery_executor"
"""str: The app name that will be used by Celery."""
WORKER_CONCURRENCY = 16
"""int: Number of concurrent worker processes. Size based on resources and task nature."""
WORKER_AUTOSCALE = ""
"""str: Auto-scaling configuration (max,min). Example: "16,4" for max 16, min 4 processes."""
WORKER_PREFETCH_MULTIPLIER = 1
"""int: Number of tasks a worker prefetches. Higher values can cause blocking."""
WORKER_ENABLE_REMOTE_CONTROL = True
"""bool: Enable remote control of workers. Required for Flower, creates reply queues."""
SYNC_PARALLELISM = 0
"""int: Processes for syncing task state. 0 means max(1, cpu_count - 1)."""
OPERATION_TIMEOUT = 1.0
"""float: Timeout in seconds for send_task_to_executor and fetch_celery_task_state."""
TASK_ACKS_LATE = True
"""bool: Wait until task finishes before allowing reassignment. Overrides visibility timeout."""
TASK_TRACK_STARTED = True
"""bool: Report task status as 'started' when executed. Used for orphan task adoption."""
TASK_PUBLISH_MAX_RETRIES = 3
"""int: Maximum retries for publishing task messages before marking as failed."""
CELERY_CONFIG_OPTIONS = "airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG"
"""str: Import path for Celery configuration options."""
POOL = "prefork"
"""str: Celery pool implementation. Options: prefork, eventlet, gevent, solo."""
WORKER_UMASK = ""
"""str: Default umask for worker daemon mode. Octal integer format."""
EXTRA_CELERY_CONFIG = "{}"
"""str: JSON string of additional Celery config. Example: '{"worker_max_tasks_per_child": 10}'"""Configuration for message broker and result backend connections.
# Broker configuration
BROKER_URL = "redis://redis:6379/0"
"""str: Celery broker URL. Supports RabbitMQ, Redis, and SQL databases."""
RESULT_BACKEND = ""
"""str: Result backend URL. When empty, uses sql_alchemy_conn with db+ prefix."""
RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS = ""
"""str: JSON config for result backend SQLAlchemy engine. Example: '{"pool_recycle": 1800}'"""
# Example broker URLs:
# Redis: "redis://localhost:6379/0"
# RabbitMQ: "amqp://guest@localhost:5672//"
# Redis with password: "redis://:password@localhost:6379/0"
# Redis Sentinel: "sentinel://localhost:26379/0;sentinel://localhost:26380/0"Security configuration for encrypted broker connections.
SSL_ACTIVE = False
"""bool: Enable SSL for broker connections."""
SSL_KEY = ""
"""str: Path to client private key file."""
SSL_CERT = ""
"""str: Path to client certificate file."""
SSL_CACERT = ""
"""str: Path to CA certificate file for server verification."""
# Example SSL configuration:
# ssl_active = True
# ssl_key = /path/to/client-key.pem
# ssl_cert = /path/to/client-cert.pem
# ssl_cacert = /path/to/ca-cert.pemConfiguration for the Flower web monitoring interface.
FLOWER_HOST = "0.0.0.0"
"""str: IP address Flower binds to."""
FLOWER_PORT = 5555
"""int: Port number for Flower web interface."""
FLOWER_URL_PREFIX = ""
"""str: URL prefix for reverse proxy setups. Example: "/flower" """
FLOWER_BASIC_AUTH = ""
"""str: HTTP basic auth credentials. Format: "user1:pass1,user2:pass2" """
# Example Flower URLs:
# Direct access: http://localhost:5555
# With prefix: http://localhost/flower (when url_prefix = "/flower")
# With auth: http://admin:secret@localhost:5555Advanced broker-specific transport configuration.
# Configuration section: [celery_broker_transport_options]
VISIBILITY_TIMEOUT = ""
"""str: Seconds to wait for worker acknowledgment before redelivery. Redis/SQS only."""
SENTINEL_KWARGS = ""
"""str: JSON config for Redis Sentinel. Example: '{"password": "redis_password"}'"""
# Example visibility timeout calculation:
# Set to match longest task execution time + buffer
# For 2-hour tasks: visibility_timeout = 7200 + 600 = 7800
# Example Sentinel configuration:
# For password-protected Redis Sentinel:
# sentinel_kwargs = {"password": "redis_server_password", "socket_timeout": 0.1}Configuration for CeleryKubernetesExecutor routing behavior.
# Configuration section: [celery_kubernetes_executor]
KUBERNETES_QUEUE = "kubernetes"
"""str: Queue name that routes tasks to KubernetesExecutor. All others use CeleryExecutor."""
# Routing examples:
# Task with queue="kubernetes" -> KubernetesExecutor
# Task with queue="default" -> CeleryExecutor
# Task with queue="high_memory" -> CeleryExecutor
# Task with queue="gpu" -> CeleryExecutor# airflow.cfg production example
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
[celery]
# Worker configuration
worker_concurrency = 16
worker_autoscale = 32,8
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
sync_parallelism = 4
# Task behavior
task_acks_late = true
task_track_started = true
task_publish_max_retries = 3
operation_timeout = 2.0
# Broker and backend
broker_url = redis://:${REDIS_PASSWORD}@redis-cluster:6379/0
result_backend = db+postgresql://airflow:${DB_PASSWORD}@postgres:5432/airflow
result_backend_sqlalchemy_engine_options = {"pool_recycle": 1800, "pool_pre_ping": true}
# Performance tuning
pool = prefork
extra_celery_config = {"worker_max_tasks_per_child": 1000, "worker_disable_rate_limits": true}
# Flower monitoring
flower_host = 0.0.0.0
flower_port = 5555
flower_basic_auth = admin:${FLOWER_PASSWORD}
[celery_broker_transport_options]
visibility_timeout = 21600 # 6 hours for long-running tasks# airflow.cfg development example
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
[celery]
# Simple development setup
worker_concurrency = 4
broker_url = redis://localhost:6379/0
result_backend = sqlite:///airflow.db
# Debug settings
worker_enable_remote_control = true
sync_parallelism = 1
operation_timeout = 5.0
# Flower for monitoring
flower_host = 127.0.0.1
flower_port = 5555
# No authentication for development# airflow.cfg HA production example
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
[celery]
# HA worker configuration
worker_concurrency = 24
worker_autoscale = 48,12
sync_parallelism = 8
task_acks_late = true
operation_timeout = 3.0
# Redis Sentinel for HA
broker_url = sentinel://sentinel1:26379,sentinel2:26379,sentinel3:26379/mymaster
result_backend = db+postgresql://airflow:pass@pg-primary:5432,pg-replica:5432/airflow?target_session_attrs=read-write
# SSL security
ssl_active = true
ssl_cert = /etc/ssl/certs/airflow-client.pem
ssl_key = /etc/ssl/private/airflow-client.key
ssl_cacert = /etc/ssl/certs/ca-bundle.pem
[celery_broker_transport_options]
# Sentinel configuration
sentinel_kwargs = {"password": "sentinel_password", "socket_timeout": 0.5}
visibility_timeout = 43200 # 12 hours# airflow.cfg hybrid executor example
[core]
executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor
[celery_kubernetes_executor]
kubernetes_queue = k8s_queue
[celery]
# Celery worker configuration for persistent tasks
worker_concurrency = 16
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:5432/airflow
[kubernetes_executor]
# Kubernetes configuration for ephemeral tasks
namespace = airflow
worker_container_repository = airflow/workers
worker_container_tag = latest
delete_worker_pods = trueConfiguration can be overridden using environment variables:
# Broker configuration
export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
export AIRFLOW__CELERY__RESULT_BACKEND="postgresql://user:pass@db:5432/airflow"
# Worker configuration
export AIRFLOW__CELERY__WORKER_CONCURRENCY=32
export AIRFLOW__CELERY__WORKER_AUTOSCALE="64,16"
# Security
export AIRFLOW__CELERY__SSL_ACTIVE=true
export AIRFLOW__CELERY__SSL_CERT="/etc/ssl/client.pem"
# Monitoring
export AIRFLOW__CELERY__FLOWER_BASIC_AUTH="admin:${FLOWER_PASSWORD}"
# Transport options
export AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT=21600Configuration can be modified programmatically:
from airflow.configuration import conf
# Read current configuration
worker_concurrency = conf.getint('celery', 'worker_concurrency')
broker_url = conf.get('celery', 'broker_url')
# Configuration validation
def validate_celery_config():
"""Validate Celery configuration for common issues."""
# Check broker connectivity
broker_url = conf.get('celery', 'broker_url')
if not broker_url:
raise ValueError("Celery broker_url not configured")
# Validate worker settings
concurrency = conf.getint('celery', 'worker_concurrency')
if concurrency < 1:
raise ValueError("Worker concurrency must be >= 1")
# Check result backend
result_backend = conf.get('celery', 'result_backend', fallback=None)
if not result_backend:
print("Warning: No result backend configured, using SQL connection")
# Validate SSL configuration
if conf.getboolean('celery', 'ssl_active'):
ssl_cert = conf.get('celery', 'ssl_cert')
ssl_key = conf.get('celery', 'ssl_key')
if not ssl_cert or not ssl_key:
raise ValueError("SSL active but cert/key not configured")
# Custom configuration loading
def load_celery_config_from_file(config_path: str):
"""Load additional Celery configuration from external file."""
import json
with open(config_path, 'r') as f:
extra_config = json.load(f)
# Update configuration
current_extra = conf.get('celery', 'extra_celery_config', fallback='{}')
current_dict = json.loads(current_extra)
current_dict.update(extra_config)
# Apply updated configuration
return json.dumps(current_dict)# Optimize worker performance based on workload
# CPU-intensive tasks
worker_concurrency = cpu_count() # Match CPU cores
pool = "prefork" # Best for CPU-bound work
worker_prefetch_multiplier = 1 # Prevent task hoarding
# I/O-intensive tasks
worker_concurrency = cpu_count() * 2 # More processes than cores
pool = "eventlet" or "gevent" # Async pool for I/O
worker_prefetch_multiplier = 4 # Can prefetch more tasks
# Memory-intensive tasks
worker_concurrency = max(1, cpu_count() // 2) # Fewer processes
extra_celery_config = {"worker_max_tasks_per_child": 100} # Prevent memory leaks
# Mixed workloads
worker_autoscale = "32,8" # Scale based on demand
sync_parallelism = max(1, cpu_count() - 1) # Efficient state syncing# Redis optimization
broker_url = "redis://redis:6379/0"
celery_broker_transport_options = {
"visibility_timeout": 21600, # Match longest task duration
"fanout_prefix": true, # Efficient broadcast
"fanout_patterns": true # Pattern-based routing
}
# RabbitMQ optimization
broker_url = "amqp://guest@rabbitmq:5672//"
extra_celery_config = {
"broker_pool_limit": 10, # Connection pool size
"broker_connection_retry_on_startup": true,
"broker_transport_options": {
"priority_steps": [0, 3, 6, 9], # Task priorities
"queue_order_strategy": "priority"
}
}# Enable comprehensive monitoring
task_track_started = True # Track task start times
worker_enable_remote_control = True # Enable Flower integration
sync_parallelism = 4 # Efficient state synchronization
# Flower monitoring optimization
flower_basic_auth = "admin:secure_password" # Security
flower_url_prefix = "/flower" # Reverse proxy integration
# Custom monitoring hooks
extra_celery_config = {
"task_send_sent_event": True, # Track task sending
"worker_send_task_events": True, # Detailed worker events
"task_ignore_result": False, # Store all results
"result_expires": 86400 # Results expire after 1 day
}Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-celery