CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cli-commands.mddocs/

CLI Commands

The Celery provider includes comprehensive command-line tools for managing Celery workers, monitoring with Flower, and performing queue operations. These commands extend Airflow's CLI with Celery-specific functionality.

Capabilities

Worker Management Commands

Commands for starting, stopping, and managing Celery workers.

def worker(args):
    """
    Start a Celery worker process.
    
    Usage: airflow celery worker [options]
    
    Parameters (via args object):
    - concurrency: int, number of concurrent worker processes
    - hostname: str, worker hostname identifier
    - queues: list[str], specific queues for this worker to consume
    - loglevel: str, logging level (DEBUG, INFO, WARNING, ERROR)
    - autoscale: str, autoscaling configuration (max,min)
    - without_gossip: bool, disable gossip for worker discovery
    - without_mingle: bool, disable startup synchronization
    - without_heartbeat: bool, disable worker heartbeat
    - daemon: bool, run as daemon process
    - pidfile: str, path to PID file for daemon mode
    - logfile: str, path to log file
    
    Environment Setup:
    - Configures worker process name and logging
    - Sets up signal handlers for graceful shutdown
    - Initializes Celery app with Airflow configuration
    """

def stop_worker(args):
    """
    Stop a running Celery worker by PID (deprecated - use shutdown instead).
    
    Usage: airflow celery stop [options]
    
    Parameters (via args object):
    - pid: str, path to PID file of worker to stop
    - verbose: bool, enable verbose output
    
    Reads PID from file and sends termination signal to worker process.
    """

def list_workers(args):
    """
    List all active Celery workers and their status.
    
    Usage: airflow celery list-workers [options]
    
    Parameters (via args object):
    - output: str, output format ('table', 'json', 'yaml', 'plain')
    
    Displays:
    - Worker hostnames and online status
    - Active task counts per worker
    - Queues each worker is consuming from
    - Load averages and resource usage
    """

def shutdown_worker(args):
    """
    Gracefully shutdown a specific Celery worker.
    
    Usage: airflow celery shutdown-worker [options]
    
    Parameters (via args object):
    - celery_hostname: str, worker hostname to shutdown (required)
    
    Sends shutdown command and waits for worker to finish current tasks.
    """

def shutdown_all_workers(args):
    """
    Shutdown all active Celery workers.
    
    Usage: airflow celery shutdown-all-workers [options]
    
    Parameters (via args object):
    - yes: bool, skip confirmation prompt
    
    Sends shutdown commands to all discovered workers and monitors
    their termination status.
    """

Queue Management Commands

Commands for managing Celery task queues and worker queue assignments.

def add_queue(args):
    """
    Add a queue to worker's consumption list.
    
    Usage: airflow celery add-queue [options]
    
    Parameters (via args object):
    - celery_hostname: str, target worker hostname (required)
    - queues: list[str], queues to add to worker (required)
    
    Dynamically adds queue to running worker without restart.
    Worker will begin consuming tasks from the new queue.
    """

def remove_queue(args):
    """
    Remove a queue from worker's consumption list.
    
    Usage: airflow celery remove-queue [options]
    
    Parameters (via args object):
    - celery_hostname: str, target worker hostname (required)
    - queues: list[str], queues to remove from worker (required)
    
    Stops worker from consuming new tasks from specified queue.
    Currently processing tasks from that queue will complete normally.
    """

Monitoring Commands

Commands for monitoring and administration of Celery clusters.

def flower(args):
    """
    Start Flower web-based monitoring and administration tool.
    
    Usage: airflow celery flower [options]
    
    Parameters (via args object):
    - hostname: str, interface to bind (default: 0.0.0.0)
    - port: int, port number (default: 5555)
    - broker_api: str, broker API URL for advanced monitoring
    - url_prefix: str, URL prefix for reverse proxy setups
    - basic_auth: str, HTTP basic authentication (user:pass,user2:pass2)
    - flower_conf: str, path to Flower configuration file
    - daemon: bool, run as daemon process
    - pidfile: str, PID file path for daemon mode
    - logfile: str, log file path
    
    Features:
    - Real-time task monitoring and statistics
    - Worker management and control
    - Queue monitoring and manipulation
    - Task routing and execution control
    - Historical task execution data
    """

Utility Functions

Supporting functions used by CLI commands.

def _check_if_active_celery_worker(hostname: str) -> bool:
    """
    Check if a Celery worker with given hostname is active.
    
    Parameters:
    - hostname: str, worker hostname to check
    
    Returns:
    bool: True if worker is active and responding
    """

def _serve_logs(skip_serve_logs: bool = False) -> None:
    """
    Start log serving process for worker logs.
    
    Parameters:
    - skip_serve_logs: bool, whether to skip log serving setup
    
    Sets up log file serving for accessing worker logs via web interface.
    """

def logger_setup_handler(logger, **kwargs) -> None:
    """
    Configure logging for Celery worker processes.
    
    Parameters:
    - logger: Logger instance to configure
    - **kwargs: Additional logging configuration parameters
    
    Sets up appropriate log formatting, handlers, and levels for
    Celery worker processes.
    """

Usage Examples

Starting Workers

# Start basic worker with default settings
airflow celery worker

# Start worker with specific concurrency
airflow celery worker --concurrency 8

# Start worker consuming from specific queues
airflow celery worker --queues high_priority,default

# Start worker with autoscaling
airflow celery worker --autoscale 16,4

# Start worker as daemon
airflow celery worker --daemon --pidfile /var/run/celery-worker.pid --logfile /var/log/celery-worker.log

# Start worker with custom hostname
airflow celery worker --hostname worker-gpu-01

# Start worker with debug logging
airflow celery worker --loglevel DEBUG

Worker Management

# List all active workers
airflow celery list

# Stop specific worker
airflow celery stop worker-01@hostname

# Gracefully shutdown worker
airflow celery shutdown worker-01@hostname

# Shutdown all workers
airflow celery shutdown_all

Queue Management

# Add queue to running worker
airflow celery add_queue worker-01@hostname ml_training

# Remove queue from worker
airflow celery remove_queue worker-01@hostname old_queue

# Check queue status (using Flower or custom scripts)
# airflow celery flower --port 5555
# Then access http://localhost:5555 for queue monitoring

Monitoring with Flower

# Start Flower with default settings
airflow celery flower

# Start Flower on custom port
airflow celery flower --port 8080

# Start Flower with authentication
airflow celery flower --basic_auth admin:secret,user:password

# Start Flower with URL prefix (for reverse proxy)
airflow celery flower --url_prefix /flower

# Start Flower as daemon
airflow celery flower --daemon --pidfile /var/run/flower.pid --logfile /var/log/flower.log

# Start Flower with broker API access
airflow celery flower --broker_api http://guest@localhost:15672/api/

Production Deployment Scripts

#!/bin/bash
# Production worker startup script

# Set environment variables
export AIRFLOW_HOME=/opt/airflow
export PYTHONPATH=/opt/airflow:$PYTHONPATH

# Start worker with production settings
airflow celery worker \
    --concurrency 16 \
    --queues default,high_priority,ml_training \
    --hostname $(hostname)-worker \
    --loglevel INFO \
    --daemon \
    --pidfile /var/run/airflow-worker.pid \
    --logfile /var/log/airflow-worker.log

# Start Flower monitoring
airflow celery flower \
    --port 5555 \
    --basic_auth admin:$(cat /etc/flower-password) \
    --daemon \
    --pidfile /var/run/flower.pid \
    --logfile /var/log/flower.log

echo "Celery worker and Flower started"

Health Check Scripts

#!/bin/bash
# Worker health check script

# Check if worker process is running
if ! pgrep -f "airflow celery worker" > /dev/null; then
    echo "ERROR: Celery worker not running"
    exit 1
fi

# Check if worker is responding
WORKER_HOSTNAME=$(hostname)-worker
if ! airflow celery list | grep -q "$WORKER_HOSTNAME"; then
    echo "ERROR: Worker $WORKER_HOSTNAME not responding"
    exit 1
fi

echo "Worker health check passed"
exit 0

Queue Scaling Scripts

#!/usr/bin/env python3
"""Dynamic queue management script."""

import subprocess
import json
from airflow.providers.celery.executors.celery_executor_utils import app

def scale_workers_for_queue(queue_name: str, target_workers: int):
    """Add or remove workers for a specific queue based on demand."""
    
    # Get current workers for this queue
    inspector = app.control.inspect()
    active_queues = inspector.active_queues()
    
    current_workers = []
    if active_queues:
        for worker, queues in active_queues.items():
            if any(q.get('name') == queue_name for q in queues):
                current_workers.append(worker)
    
    current_count = len(current_workers)
    
    if current_count < target_workers:
        # Need to add workers
        needed = target_workers - current_count
        for i in range(needed):
            subprocess.run([
                'airflow', 'celery', 'worker',
                '--queues', queue_name,
                '--hostname', f'{queue_name}-worker-{i}',
                '--daemon'
            ])
            print(f"Started worker {queue_name}-worker-{i}")
    
    elif current_count > target_workers:
        # Need to remove workers
        excess = current_count - target_workers
        for i in range(excess):
            worker_to_stop = current_workers[i]
            subprocess.run([
                'airflow', 'celery', 'shutdown', worker_to_stop
            ])
            print(f"Stopped worker {worker_to_stop}")

if __name__ == "__main__":
    # Example: Scale ml_training queue to 4 workers
    scale_workers_for_queue('ml_training', 4)

Docker Integration

# Dockerfile for Celery worker
FROM apache/airflow:2.8.0

# Install additional dependencies
COPY requirements.txt /requirements.txt
RUN pip install -r /requirements.txt

# Copy DAGs and configuration
COPY dags/ /opt/airflow/dags/
COPY config/ /opt/airflow/config/

# Entrypoint script
COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh

ENTRYPOINT ["/docker-entrypoint.sh"]
#!/bin/bash
# docker-entrypoint.sh

# Initialize Airflow database (if needed)
if [ "$1" = "worker" ]; then
    # Start Celery worker
    exec airflow celery worker \
        --concurrency ${WORKER_CONCURRENCY:-16} \
        --queues ${WORKER_QUEUES:-default} \
        --hostname ${HOSTNAME}-worker
elif [ "$1" = "flower" ]; then
    # Start Flower monitoring
    exec airflow celery flower \
        --port ${FLOWER_PORT:-5555}
else
    # Pass through other commands
    exec "$@"
fi
# docker-compose.yml
version: '3.8'
services:
  worker:
    build: .
    command: worker
    environment:
      - WORKER_CONCURRENCY=8
      - WORKER_QUEUES=default,high_priority
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
    depends_on:
      - redis
      - postgres

  flower:
    build: .
    command: flower
    ports:
      - "5555:5555"
    environment:
      - FLOWER_PORT=5555
    depends_on:
      - redis

Configuration

CLI commands respect Airflow configuration and environment variables:

# Key configuration sections used by CLI commands:

# [celery] section
WORKER_CONCURRENCY = 16
FLOWER_HOST = "0.0.0.0"
FLOWER_PORT = 5555
FLOWER_BASIC_AUTH = ""  # user:pass,user2:pass2
WORKER_AUTOSCALE = ""   # max,min
WORKER_PREFETCH_MULTIPLIER = 1

# [logging] section
LOGGING_LEVEL = "INFO"
BASE_LOG_FOLDER = "/opt/airflow/logs"

# Environment variables
AIRFLOW_HOME = "/opt/airflow"
PYTHONPATH = "/opt/airflow"

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-celery

docs

celery-executor.md

celery-kubernetes-executor.md

cli-commands.md

configuration.md

index.md

queue-monitoring.md

tile.json