CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-celery

Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

celery-executor.mddocs/

Celery Executor

The CeleryExecutor is the primary distributed task executor that uses Celery to distribute Airflow tasks across multiple worker nodes. It provides horizontal scalability, fault tolerance, and load distribution for production Airflow deployments.

Capabilities

CeleryExecutor Class

Main executor class implementing Airflow's BaseExecutor interface for Celery-based task distribution.

class CeleryExecutor(BaseExecutor):
    """
    Celery executor for distributed task execution.
    
    This executor sends tasks to Celery workers running on multiple machines,
    providing horizontal scalability and fault tolerance.
    """
    
    # Class attributes
    supports_ad_hoc_ti_run: bool = True
    supports_sentry: bool = True
    supports_pickling: bool = True
    is_local: bool = False
    is_single_threaded: bool = False
    is_production: bool = True
    serve_logs: bool = False
    callback_sink: BaseCallbackSink | None = None
    
    # For Airflow 3.0+
    queued_tasks: dict[TaskInstanceKey, workloads.All]
    
    def __init__(self) -> None:
        """
        Initialize the CeleryExecutor.
        
        Sets up the executor with default configuration and prepares
        for Celery app initialization.
        """
    
    def start(self) -> None:
        """
        Start the executor and initialize Celery app connection.
        
        Initializes the Celery application, sets up task queues, and prepares
        for task submission.
        """
    
    def queue_workload(self, workload: workloads.All, session: Session | None = None) -> None:
        """
        Queue a workload for execution via Celery (Airflow 3.0+).
        
        Parameters:
        - workload: workloads.All, the workload to execute (typically ExecuteTask)
        - session: Session | None, optional database session
        """
    
    def sync(self) -> None:
        """
        Synchronize task states between Airflow and Celery.
        
        Fetches task states from Celery workers and updates Airflow's
        task instance states accordingly.
        """
    
    def debug_dump(self) -> None:
        """
        Print debug information about current executor state.
        
        Logs information about running tasks, queued tasks, and
        executor configuration for debugging purposes.
        """
    
    def update_all_task_states(self) -> None:
        """
        Update states for all tracked tasks.
        
        Queries Celery for current states of all running tasks
        and updates the local task state tracking.
        """
    
    def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None, remove_running: bool = True) -> None:
        """
        Change the state of a specific task instance.
        
        Parameters:
        - key: TaskInstanceKey, unique identifier for the task instance
        - state: TaskInstanceState, new state for the task
        - info: Any, optional additional state information
        - remove_running: bool, whether to remove from running tasks set
        """
    
    def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None:
        """
        Update the state of a task instance.
        
        Parameters:
        - key: TaskInstanceKey, unique identifier for the task instance
        - state: str, new state as string
        - info: Any, additional state information
        """
    
    def end(self, synchronous: bool = False) -> None:
        """
        Clean shutdown of the executor.
        
        Parameters:
        - synchronous: bool, whether to wait for all tasks to complete
        
        Waits for running tasks to complete and gracefully shuts down
        the Celery connection.
        """
    
    def terminate(self) -> None:
        """
        Force termination of all running tasks.
        
        Immediately terminates all submitted tasks and shuts down
        the executor.
        """
    
    def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
        """
        Try to adopt orphaned task instances.
        
        Parameters:
        - tis: Sequence[TaskInstance], task instances to attempt adoption
        
        Returns:
        Sequence[TaskInstance]: task instances that could not be adopted
        """
    
    def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
        """
        Clean up task instances stuck in queued state (deprecated).
        
        Parameters:
        - tis: list[TaskInstance], task instances to clean up
        
        Returns:
        list[str]: task instance keys that were cleaned up
        """
    
    def revoke_task(self, *, ti: TaskInstance) -> None:
        """
        Revoke a running task.
        
        Parameters:
        - ti: TaskInstance, task instance to revoke
        
        Sends a revoke signal to Celery to terminate the task.
        """
    
    @staticmethod
    def get_cli_commands() -> list[GroupCommand]:
        """
        Get CLI commands provided by this executor.
        
        Returns:
        list[GroupCommand]: list of CLI command groups
        """

Task Submission Functions

Utility functions for sending tasks to Celery workers and fetching task states.

def send_task_to_executor(task_tuple: TaskInstanceInCelery) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
    """
    Send a task to the Celery executor.
    
    Parameters:
    - task_tuple: TaskInstanceInCelery, tuple containing (TaskInstance, CommandType)
    
    Returns:
    tuple containing:
    - TaskInstanceKey: unique identifier for the task instance
    - CommandType: the command that was executed
    - AsyncResult | ExceptionWithTraceback: Celery result or exception
    """

def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | ExceptionWithTraceback, Any]:
    """
    Fetch the current state of a Celery task.
    
    Parameters:
    - async_result: AsyncResult, Celery result object
    
    Returns:
    tuple containing:
    - str: Task state (PENDING, SUCCESS, FAILURE, etc.)
    - str | ExceptionWithTraceback: Result data or exception info
    - Any: Additional task metadata
    """

Task Execution

Core functions for executing workloads within Celery workers.

def execute_workload(input: str) -> None:
    """
    Execute an Airflow workload within a Celery worker (Airflow 3.0+).
    
    This function is called by Celery workers to execute the actual
    Airflow task. It deserializes the workload and runs the task.
    
    Parameters:
    - input: str, serialized workload containing task execution parameters
    
    Raises:
    - AirflowTaskTimeout: If task execution exceeds timeout
    - Exception: Any exception raised during task execution
    """

def execute_command(command_to_exec: CommandType) -> None:
    """
    Execute an Airflow command within a Celery worker (Airflow 2.x).
    
    This function is called by Celery workers to execute Airflow
    tasks in the legacy command-based format.
    
    Parameters:
    - command_to_exec: CommandType, command list to execute
    
    Raises:
    - AirflowTaskTimeout: If task execution exceeds timeout
    - Exception: Any exception raised during task execution
    """

def on_celery_import_modules(*args, **kwargs) -> None:
    """
    Celery signal handler for module import events.
    
    Called when Celery imports modules, used to set up proper
    configuration and logging for Airflow tasks.
    
    Parameters:
    - *args: Variable arguments from Celery signal
    - **kwargs: Keyword arguments from Celery signal
    """

Bulk State Management

Utility class for efficiently fetching states of multiple Celery tasks.

class BulkStateFetcher(LoggingMixin):
    """
    Efficiently fetch states of multiple Celery tasks in bulk.
    
    This class optimizes the process of checking states for large numbers
    of concurrent tasks by batching requests and managing connections.
    """
    
    def __init__(self, sync_parallelism=None):
        """
        Initialize the bulk state fetcher.
        
        Parameters:
        - sync_parallelism: int | None, number of parallel processes for fetching states
                           If None, defaults to max(1, cpu_count - 1)
        """
    
    def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
        """
        Fetch states for multiple tasks efficiently.
        
        Parameters:
        - async_results: Iterable of AsyncResult objects
        
        Returns:
        Mapping[str, EventBufferValueType]: mapping of task IDs to state information
        """

Configuration

Key configuration parameters for CeleryExecutor:

# Configuration accessed via airflow.configuration.conf

# Core Celery settings
CELERY_APP_NAME = "airflow.providers.celery.executors.celery_executor"
WORKER_CONCURRENCY = 16
SYNC_PARALLELISM = 0  # 0 means max(1, cpu_count - 1)

# Broker and backend
BROKER_URL = "redis://redis:6379/0"
RESULT_BACKEND = "db+postgresql://postgres:airflow@postgres/airflow"

# Task behavior
TASK_ACKS_LATE = True
TASK_TRACK_STARTED = True
TASK_PUBLISH_MAX_RETRIES = 3

# Worker settings
WORKER_PREFETCH_MULTIPLIER = 1
WORKER_ENABLE_REMOTE_CONTROL = True
OPERATION_TIMEOUT = 1.0

# SSL configuration
SSL_ACTIVE = False
SSL_KEY = ""
SSL_CERT = ""
SSL_CACERT = ""

# Pool implementation
POOL = "prefork"  # prefork, eventlet, gevent, solo

Usage Examples

Basic Executor Configuration

# In airflow.cfg:
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor

[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:airflow@postgres/airflow
worker_concurrency = 16
sync_parallelism = 4

Programmatic Usage

from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.models.taskinstancekey import TaskInstanceKey

# Initialize executor
executor = CeleryExecutor()
executor.start()

# Submit a task (typically done by Airflow scheduler)
task_key = TaskInstanceKey(dag_id="my_dag", task_id="my_task", 
                          run_id="manual_run", try_number=1)
command = ["airflow", "tasks", "run", "my_dag", "my_task", "manual_run"]

executor.execute_async(key=task_key, command=command, queue="default")

# Sync task states
executor.sync()

# Cleanup
executor.end()

Custom Queue Configuration

# Route tasks to specific queues based on task properties
from airflow import DAG
from airflow.operators.python import PythonOperator

def my_heavy_task():
    # CPU intensive work
    pass

def my_light_task():
    # Light processing
    pass

dag = DAG('queue_routing', schedule_interval=None)

# Route to high-memory queue
heavy_task = PythonOperator(
    task_id='heavy_processing',
    python_callable=my_heavy_task,
    queue='high_memory',  # Celery queue name
    dag=dag
)

# Route to default queue
light_task = PythonOperator(
    task_id='light_processing', 
    python_callable=my_light_task,
    queue='default',
    dag=dag
)

Error Handling

class ExceptionWithTraceback:
    """
    Container for exceptions that preserves traceback information.
    
    Used to transport exception details from Celery workers back to
    the Airflow scheduler with full traceback context.
    """
    
    def __init__(self, exception: Exception, traceback_str: str):
        """
        Initialize exception container.
        
        Parameters:
        - exception: Exception, the original exception
        - traceback_str: str, formatted traceback string
        """

Common exceptions and error handling patterns:

from airflow.exceptions import AirflowTaskTimeout
from celery.exceptions import Retry, WorkerLostError

# Timeout handling
try:
    execute_workload(task_data)
except AirflowTaskTimeout:
    # Task exceeded execution time limit
    # Celery will mark task as failed
    raise

# Worker connection issues  
try:
    result = send_task_to_executor(executor_name, task_id, workload)
except WorkerLostError:
    # Worker disconnected during task execution
    # Implement retry logic or fail task
    raise

# Retry failed tasks
except Exception as e:
    if retry_count < max_retries:
        raise Retry(countdown=retry_delay)
    else:
        raise e

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-celery

docs

celery-executor.md

celery-kubernetes-executor.md

cli-commands.md

configuration.md

index.md

queue-monitoring.md

tile.json