Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The CeleryExecutor is the primary distributed task executor that uses Celery to distribute Airflow tasks across multiple worker nodes. It provides horizontal scalability, fault tolerance, and load distribution for production Airflow deployments.
Main executor class implementing Airflow's BaseExecutor interface for Celery-based task distribution.
class CeleryExecutor(BaseExecutor):
"""
Celery executor for distributed task execution.
This executor sends tasks to Celery workers running on multiple machines,
providing horizontal scalability and fault tolerance.
"""
# Class attributes
supports_ad_hoc_ti_run: bool = True
supports_sentry: bool = True
supports_pickling: bool = True
is_local: bool = False
is_single_threaded: bool = False
is_production: bool = True
serve_logs: bool = False
callback_sink: BaseCallbackSink | None = None
# For Airflow 3.0+
queued_tasks: dict[TaskInstanceKey, workloads.All]
def __init__(self) -> None:
"""
Initialize the CeleryExecutor.
Sets up the executor with default configuration and prepares
for Celery app initialization.
"""
def start(self) -> None:
"""
Start the executor and initialize Celery app connection.
Initializes the Celery application, sets up task queues, and prepares
for task submission.
"""
def queue_workload(self, workload: workloads.All, session: Session | None = None) -> None:
"""
Queue a workload for execution via Celery (Airflow 3.0+).
Parameters:
- workload: workloads.All, the workload to execute (typically ExecuteTask)
- session: Session | None, optional database session
"""
def sync(self) -> None:
"""
Synchronize task states between Airflow and Celery.
Fetches task states from Celery workers and updates Airflow's
task instance states accordingly.
"""
def debug_dump(self) -> None:
"""
Print debug information about current executor state.
Logs information about running tasks, queued tasks, and
executor configuration for debugging purposes.
"""
def update_all_task_states(self) -> None:
"""
Update states for all tracked tasks.
Queries Celery for current states of all running tasks
and updates the local task state tracking.
"""
def change_state(self, key: TaskInstanceKey, state: TaskInstanceState, info=None, remove_running: bool = True) -> None:
"""
Change the state of a specific task instance.
Parameters:
- key: TaskInstanceKey, unique identifier for the task instance
- state: TaskInstanceState, new state for the task
- info: Any, optional additional state information
- remove_running: bool, whether to remove from running tasks set
"""
def update_task_state(self, key: TaskInstanceKey, state: str, info: Any) -> None:
"""
Update the state of a task instance.
Parameters:
- key: TaskInstanceKey, unique identifier for the task instance
- state: str, new state as string
- info: Any, additional state information
"""
def end(self, synchronous: bool = False) -> None:
"""
Clean shutdown of the executor.
Parameters:
- synchronous: bool, whether to wait for all tasks to complete
Waits for running tasks to complete and gracefully shuts down
the Celery connection.
"""
def terminate(self) -> None:
"""
Force termination of all running tasks.
Immediately terminates all submitted tasks and shuts down
the executor.
"""
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
"""
Try to adopt orphaned task instances.
Parameters:
- tis: Sequence[TaskInstance], task instances to attempt adoption
Returns:
Sequence[TaskInstance]: task instances that could not be adopted
"""
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
"""
Clean up task instances stuck in queued state (deprecated).
Parameters:
- tis: list[TaskInstance], task instances to clean up
Returns:
list[str]: task instance keys that were cleaned up
"""
def revoke_task(self, *, ti: TaskInstance) -> None:
"""
Revoke a running task.
Parameters:
- ti: TaskInstance, task instance to revoke
Sends a revoke signal to Celery to terminate the task.
"""
@staticmethod
def get_cli_commands() -> list[GroupCommand]:
"""
Get CLI commands provided by this executor.
Returns:
list[GroupCommand]: list of CLI command groups
"""Utility functions for sending tasks to Celery workers and fetching task states.
def send_task_to_executor(task_tuple: TaskInstanceInCelery) -> tuple[TaskInstanceKey, CommandType, AsyncResult | ExceptionWithTraceback]:
"""
Send a task to the Celery executor.
Parameters:
- task_tuple: TaskInstanceInCelery, tuple containing (TaskInstance, CommandType)
Returns:
tuple containing:
- TaskInstanceKey: unique identifier for the task instance
- CommandType: the command that was executed
- AsyncResult | ExceptionWithTraceback: Celery result or exception
"""
def fetch_celery_task_state(async_result: AsyncResult) -> tuple[str, str | ExceptionWithTraceback, Any]:
"""
Fetch the current state of a Celery task.
Parameters:
- async_result: AsyncResult, Celery result object
Returns:
tuple containing:
- str: Task state (PENDING, SUCCESS, FAILURE, etc.)
- str | ExceptionWithTraceback: Result data or exception info
- Any: Additional task metadata
"""Core functions for executing workloads within Celery workers.
def execute_workload(input: str) -> None:
"""
Execute an Airflow workload within a Celery worker (Airflow 3.0+).
This function is called by Celery workers to execute the actual
Airflow task. It deserializes the workload and runs the task.
Parameters:
- input: str, serialized workload containing task execution parameters
Raises:
- AirflowTaskTimeout: If task execution exceeds timeout
- Exception: Any exception raised during task execution
"""
def execute_command(command_to_exec: CommandType) -> None:
"""
Execute an Airflow command within a Celery worker (Airflow 2.x).
This function is called by Celery workers to execute Airflow
tasks in the legacy command-based format.
Parameters:
- command_to_exec: CommandType, command list to execute
Raises:
- AirflowTaskTimeout: If task execution exceeds timeout
- Exception: Any exception raised during task execution
"""
def on_celery_import_modules(*args, **kwargs) -> None:
"""
Celery signal handler for module import events.
Called when Celery imports modules, used to set up proper
configuration and logging for Airflow tasks.
Parameters:
- *args: Variable arguments from Celery signal
- **kwargs: Keyword arguments from Celery signal
"""Utility class for efficiently fetching states of multiple Celery tasks.
class BulkStateFetcher(LoggingMixin):
"""
Efficiently fetch states of multiple Celery tasks in bulk.
This class optimizes the process of checking states for large numbers
of concurrent tasks by batching requests and managing connections.
"""
def __init__(self, sync_parallelism=None):
"""
Initialize the bulk state fetcher.
Parameters:
- sync_parallelism: int | None, number of parallel processes for fetching states
If None, defaults to max(1, cpu_count - 1)
"""
def get_many(self, async_results) -> Mapping[str, EventBufferValueType]:
"""
Fetch states for multiple tasks efficiently.
Parameters:
- async_results: Iterable of AsyncResult objects
Returns:
Mapping[str, EventBufferValueType]: mapping of task IDs to state information
"""Key configuration parameters for CeleryExecutor:
# Configuration accessed via airflow.configuration.conf
# Core Celery settings
CELERY_APP_NAME = "airflow.providers.celery.executors.celery_executor"
WORKER_CONCURRENCY = 16
SYNC_PARALLELISM = 0 # 0 means max(1, cpu_count - 1)
# Broker and backend
BROKER_URL = "redis://redis:6379/0"
RESULT_BACKEND = "db+postgresql://postgres:airflow@postgres/airflow"
# Task behavior
TASK_ACKS_LATE = True
TASK_TRACK_STARTED = True
TASK_PUBLISH_MAX_RETRIES = 3
# Worker settings
WORKER_PREFETCH_MULTIPLIER = 1
WORKER_ENABLE_REMOTE_CONTROL = True
OPERATION_TIMEOUT = 1.0
# SSL configuration
SSL_ACTIVE = False
SSL_KEY = ""
SSL_CERT = ""
SSL_CACERT = ""
# Pool implementation
POOL = "prefork" # prefork, eventlet, gevent, solo# In airflow.cfg:
[core]
executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
[celery]
broker_url = redis://redis:6379/0
result_backend = db+postgresql://postgres:airflow@postgres/airflow
worker_concurrency = 16
sync_parallelism = 4from airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.models.taskinstancekey import TaskInstanceKey
# Initialize executor
executor = CeleryExecutor()
executor.start()
# Submit a task (typically done by Airflow scheduler)
task_key = TaskInstanceKey(dag_id="my_dag", task_id="my_task",
run_id="manual_run", try_number=1)
command = ["airflow", "tasks", "run", "my_dag", "my_task", "manual_run"]
executor.execute_async(key=task_key, command=command, queue="default")
# Sync task states
executor.sync()
# Cleanup
executor.end()# Route tasks to specific queues based on task properties
from airflow import DAG
from airflow.operators.python import PythonOperator
def my_heavy_task():
# CPU intensive work
pass
def my_light_task():
# Light processing
pass
dag = DAG('queue_routing', schedule_interval=None)
# Route to high-memory queue
heavy_task = PythonOperator(
task_id='heavy_processing',
python_callable=my_heavy_task,
queue='high_memory', # Celery queue name
dag=dag
)
# Route to default queue
light_task = PythonOperator(
task_id='light_processing',
python_callable=my_light_task,
queue='default',
dag=dag
)class ExceptionWithTraceback:
"""
Container for exceptions that preserves traceback information.
Used to transport exception details from Celery workers back to
the Airflow scheduler with full traceback context.
"""
def __init__(self, exception: Exception, traceback_str: str):
"""
Initialize exception container.
Parameters:
- exception: Exception, the original exception
- traceback_str: str, formatted traceback string
"""Common exceptions and error handling patterns:
from airflow.exceptions import AirflowTaskTimeout
from celery.exceptions import Retry, WorkerLostError
# Timeout handling
try:
execute_workload(task_data)
except AirflowTaskTimeout:
# Task exceeded execution time limit
# Celery will mark task as failed
raise
# Worker connection issues
try:
result = send_task_to_executor(executor_name, task_id, workload)
except WorkerLostError:
# Worker disconnected during task execution
# Implement retry logic or fail task
raise
# Retry failed tasks
except Exception as e:
if retry_count < max_retries:
raise Retry(countdown=retry_delay)
else:
raise eInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-celery