CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

configuration.mddocs/

Configuration

The Celery provider offers extensive configuration options to customize broker connections, worker behavior, monitoring, security, and performance tuning. Configuration is managed through Airflow's standard configuration system.

Capabilities

Core Celery Configuration

Primary configuration section for Celery executor behavior.

# Configuration section: [celery]

CELERY_APP_NAME = "airflow.providers.celery.executors.celery_executor"
"""str: The app name that will be used by Celery."""

WORKER_CONCURRENCY = 16
"""int: Number of concurrent worker processes. Size based on resources and task nature."""

WORKER_AUTOSCALE = ""
"""str: Auto-scaling configuration (max,min). Example: "16,4" for max 16, min 4 processes."""

WORKER_PREFETCH_MULTIPLIER = 1
"""int: Number of tasks a worker prefetches. Higher values can cause blocking."""

WORKER_ENABLE_REMOTE_CONTROL = True
"""bool: Enable remote control of workers. Required for Flower, creates reply queues."""

SYNC_PARALLELISM = 0
"""int: Processes for syncing task state. 0 means max(1, cpu_count - 1)."""

OPERATION_TIMEOUT = 1.0
"""float: Timeout in seconds for send_task_to_executor and fetch_celery_task_state."""

TASK_ACKS_LATE = True
"""bool: Wait until task finishes before allowing reassignment. Overrides visibility timeout."""

TASK_TRACK_STARTED = True
"""bool: Report task status as 'started' when executed. Used for orphan task adoption."""

TASK_PUBLISH_MAX_RETRIES = 3
"""int: Maximum retries for publishing task messages before marking as failed."""

CELERY_CONFIG_OPTIONS = "airflow.providers.celery.executors.default_celery.DEFAULT_CELERY_CONFIG"
"""str: Import path for Celery configuration options."""

POOL = "prefork"
"""str: Celery pool implementation. Options: prefork, eventlet, gevent, solo."""

WORKER_UMASK = ""
"""str: Default umask for worker daemon mode. Octal integer format."""

EXTRA_CELERY_CONFIG = "{}"
"""str: JSON string of additional Celery config. Example: '{"worker_max_tasks_per_child": 10}'"""

Broker and Backend Configuration

Configuration for message broker and result backend connections.

# Broker configuration
BROKER_URL = "redis://redis:6379/0"
"""str: Celery broker URL. Supports RabbitMQ, Redis, and SQL databases."""

RESULT_BACKEND = ""
"""str: Result backend URL. When empty, uses sql_alchemy_conn with db+ prefix."""

RESULT_BACKEND_SQLALCHEMY_ENGINE_OPTIONS = ""
"""str: JSON config for result backend SQLAlchemy engine. Example: '{"pool_recycle": 1800}'"""

# Example broker URLs:
# Redis: "redis://localhost:6379/0"
# RabbitMQ: "amqp://guest@localhost:5672//"
# Redis with password: "redis://:password@localhost:6379/0"
# Redis Sentinel: "sentinel://localhost:26379/0;sentinel://localhost:26380/0"

SSL Configuration

Security configuration for encrypted broker connections.

SSL_ACTIVE = False
"""bool: Enable SSL for broker connections."""

SSL_KEY = ""
"""str: Path to client private key file."""

SSL_CERT = ""
"""str: Path to client certificate file."""

SSL_CACERT = ""
"""str: Path to CA certificate file for server verification."""

# Example SSL configuration:
# ssl_active = True
# ssl_key = /path/to/client-key.pem
# ssl_cert = /path/to/client-cert.pem  
# ssl_cacert = /path/to/ca-cert.pem

Flower Monitoring Configuration

Configuration for the Flower web monitoring interface.

FLOWER_HOST = "0.0.0.0"
"""str: IP address Flower binds to."""

FLOWER_PORT = 5555
"""int: Port number for Flower web interface."""

FLOWER_URL_PREFIX = ""  
"""str: URL prefix for reverse proxy setups. Example: "/flower" """

FLOWER_BASIC_AUTH = ""
"""str: HTTP basic auth credentials. Format: "user1:pass1,user2:pass2" """

# Example Flower URLs:
# Direct access: http://localhost:5555
# With prefix: http://localhost/flower (when url_prefix = "/flower")
# With auth: http://admin:secret@localhost:5555

Broker Transport Options

Advanced broker-specific transport configuration.

# Configuration section: [celery_broker_transport_options]

VISIBILITY_TIMEOUT = ""
"""str: Seconds to wait for worker acknowledgment before redelivery. Redis/SQS only."""

SENTINEL_KWARGS = ""
"""str: JSON config for Redis Sentinel. Example: '{"password": "redis_password"}'"""

# Example visibility timeout calculation:
# Set to match longest task execution time + buffer
# For 2-hour tasks: visibility_timeout = 7200 + 600 = 7800

# Example Sentinel configuration:
# For password-protected Redis Sentinel:
# sentinel_kwargs = {"password": "redis_server_password", "socket_timeout": 0.1}

Hybrid Executor Configuration

Configuration for CeleryKubernetesExecutor routing behavior.

# Configuration section: [celery_kubernetes_executor]

KUBERNETES_QUEUE = "kubernetes"
"""str: Queue name that routes tasks to KubernetesExecutor. All others use CeleryExecutor."""

# Routing examples:
# Task with queue="kubernetes" -> KubernetesExecutor
# Task with queue="default" -> CeleryExecutor  
# Task with queue="high_memory" -> CeleryExecutor
# Task with queue="gpu" -> CeleryExecutor

Configuration Examples

Production Configuration

# airflow.cfg production example

[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

[celery]
# Worker configuration
worker_concurrency = 16
worker_autoscale = 32,8
worker_prefetch_multiplier = 1
worker_enable_remote_control = true
sync_parallelism = 4

# Task behavior
task_acks_late = true
task_track_started = true
task_publish_max_retries = 3
operation_timeout = 2.0

# Broker and backend
broker_url = redis://:${REDIS_PASSWORD}@redis-cluster:6379/0
result_backend = db+postgresql://airflow:${DB_PASSWORD}@postgres:5432/airflow
result_backend_sqlalchemy_engine_options = {"pool_recycle": 1800, "pool_pre_ping": true}

# Performance tuning
pool = prefork
extra_celery_config = {"worker_max_tasks_per_child": 1000, "worker_disable_rate_limits": true}

# Flower monitoring  
flower_host = 0.0.0.0
flower_port = 5555
flower_basic_auth = admin:${FLOWER_PASSWORD}

[celery_broker_transport_options]
visibility_timeout = 21600  # 6 hours for long-running tasks

Development Configuration

# airflow.cfg development example

[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

[celery]
# Simple development setup
worker_concurrency = 4
broker_url = redis://localhost:6379/0
result_backend = sqlite:///airflow.db

# Debug settings
worker_enable_remote_control = true
sync_parallelism = 1
operation_timeout = 5.0

# Flower for monitoring
flower_host = 127.0.0.1
flower_port = 5555
# No authentication for development

High Availability Configuration

# airflow.cfg HA production example

[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

[celery]
# HA worker configuration
worker_concurrency = 24
worker_autoscale = 48,12
sync_parallelism = 8
task_acks_late = true
operation_timeout = 3.0

# Redis Sentinel for HA
broker_url = sentinel://sentinel1:26379,sentinel2:26379,sentinel3:26379/mymaster
result_backend = db+postgresql://airflow:pass@pg-primary:5432,pg-replica:5432/airflow?target_session_attrs=read-write

# SSL security
ssl_active = true
ssl_cert = /etc/ssl/certs/airflow-client.pem
ssl_key = /etc/ssl/private/airflow-client.key
ssl_cacert = /etc/ssl/certs/ca-bundle.pem

[celery_broker_transport_options]
# Sentinel configuration
sentinel_kwargs = {"password": "sentinel_password", "socket_timeout": 0.5}
visibility_timeout = 43200  # 12 hours

Hybrid Executor Configuration

# airflow.cfg hybrid executor example

[core]
executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor

[celery_kubernetes_executor]
kubernetes_queue = k8s_queue

[celery]
# Celery worker configuration for persistent tasks
worker_concurrency = 16
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:5432/airflow

[kubernetes_executor]
# Kubernetes configuration for ephemeral tasks  
namespace = airflow
worker_container_repository = airflow/workers
worker_container_tag = latest
delete_worker_pods = true

Environment Variables

Configuration can be overridden using environment variables:

# Broker configuration
export AIRFLOW__CELERY__BROKER_URL="redis://redis:6379/0"
export AIRFLOW__CELERY__RESULT_BACKEND="postgresql://user:pass@db:5432/airflow"

# Worker configuration
export AIRFLOW__CELERY__WORKER_CONCURRENCY=32
export AIRFLOW__CELERY__WORKER_AUTOSCALE="64,16"

# Security
export AIRFLOW__CELERY__SSL_ACTIVE=true
export AIRFLOW__CELERY__SSL_CERT="/etc/ssl/client.pem"

# Monitoring
export AIRFLOW__CELERY__FLOWER_BASIC_AUTH="admin:${FLOWER_PASSWORD}"

# Transport options
export AIRFLOW__CELERY_BROKER_TRANSPORT_OPTIONS__VISIBILITY_TIMEOUT=21600

Dynamic Configuration

Configuration can be modified programmatically:

from airflow.configuration import conf

# Read current configuration
worker_concurrency = conf.getint('celery', 'worker_concurrency')
broker_url = conf.get('celery', 'broker_url')

# Configuration validation
def validate_celery_config():
    """Validate Celery configuration for common issues."""
    
    # Check broker connectivity
    broker_url = conf.get('celery', 'broker_url')
    if not broker_url:
        raise ValueError("Celery broker_url not configured")
    
    # Validate worker settings
    concurrency = conf.getint('celery', 'worker_concurrency')
    if concurrency < 1:
        raise ValueError("Worker concurrency must be >= 1")
    
    # Check result backend
    result_backend = conf.get('celery', 'result_backend', fallback=None)
    if not result_backend:
        print("Warning: No result backend configured, using SQL connection")
    
    # Validate SSL configuration
    if conf.getboolean('celery', 'ssl_active'):
        ssl_cert = conf.get('celery', 'ssl_cert')
        ssl_key = conf.get('celery', 'ssl_key')
        if not ssl_cert or not ssl_key:
            raise ValueError("SSL active but cert/key not configured")

# Custom configuration loading
def load_celery_config_from_file(config_path: str):
    """Load additional Celery configuration from external file."""
    import json
    
    with open(config_path, 'r') as f:
        extra_config = json.load(f)
    
    # Update configuration
    current_extra = conf.get('celery', 'extra_celery_config', fallback='{}')
    current_dict = json.loads(current_extra)
    current_dict.update(extra_config)
    
    # Apply updated configuration
    return json.dumps(current_dict)

Performance Tuning

Worker Performance

# Optimize worker performance based on workload

# CPU-intensive tasks
worker_concurrency = cpu_count()  # Match CPU cores
pool = "prefork"  # Best for CPU-bound work
worker_prefetch_multiplier = 1  # Prevent task hoarding

# I/O-intensive tasks  
worker_concurrency = cpu_count() * 2  # More processes than cores
pool = "eventlet" or "gevent"  # Async pool for I/O
worker_prefetch_multiplier = 4  # Can prefetch more tasks

# Memory-intensive tasks
worker_concurrency = max(1, cpu_count() // 2)  # Fewer processes
extra_celery_config = {"worker_max_tasks_per_child": 100}  # Prevent memory leaks

# Mixed workloads
worker_autoscale = "32,8"  # Scale based on demand
sync_parallelism = max(1, cpu_count() - 1)  # Efficient state syncing

Broker Performance

# Redis optimization
broker_url = "redis://redis:6379/0"
celery_broker_transport_options = {
    "visibility_timeout": 21600,  # Match longest task duration
    "fanout_prefix": true,  # Efficient broadcast
    "fanout_patterns": true  # Pattern-based routing
}

# RabbitMQ optimization  
broker_url = "amqp://guest@rabbitmq:5672//"
extra_celery_config = {
    "broker_pool_limit": 10,  # Connection pool size
    "broker_connection_retry_on_startup": true,
    "broker_transport_options": {
        "priority_steps": [0, 3, 6, 9],  # Task priorities
        "queue_order_strategy": "priority"
    }
}

Monitoring Configuration

# Enable comprehensive monitoring
task_track_started = True  # Track task start times
worker_enable_remote_control = True  # Enable Flower integration
sync_parallelism = 4  # Efficient state synchronization

# Flower monitoring optimization
flower_basic_auth = "admin:secure_password"  # Security
flower_url_prefix = "/flower"  # Reverse proxy integration

# Custom monitoring hooks
extra_celery_config = {
    "task_send_sent_event": True,  # Track task sending
    "worker_send_task_events": True,  # Detailed worker events
    "task_ignore_result": False,  # Store all results
    "result_expires": 86400  # Results expire after 1 day
}

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-celery

docs

celery-executor.md

celery-kubernetes-executor.md

cli-commands.md

configuration.md

index.md

queue-monitoring.md

tile.json