Provider package that enables distributed task execution using Celery as the task queue for Apache Airflow
npx @tessl/cli install tessl/pypi-apache-airflow-providers-celery@3.12.0A provider package that enables distributed task execution using Celery as the task queue for Apache Airflow. This package provides Celery-based executors, monitoring sensors, and CLI commands for running Airflow tasks across multiple worker nodes with horizontal scalability and fault tolerance.
pip install apache-airflow-providers-celeryfrom airflow.providers.celery.executors.celery_executor import CeleryExecutor
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor# Using CeleryExecutor in airflow.cfg
# [core]
# executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
# Using CeleryQueueSensor in DAGs
from airflow import DAG
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
from datetime import datetime, timedelta
dag = DAG(
'celery_monitoring',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval=timedelta(hours=1),
catchup=False
)
# Wait for a specific Celery queue to be empty
queue_sensor = CeleryQueueSensor(
task_id='wait_for_queue_empty',
celery_queue='high_priority',
timeout=300,
poke_interval=30,
dag=dag
)
# CLI usage for starting workers and monitoring
# airflow celery worker --concurrency 16
# airflow celery flower --port 5555The package follows Airflow's executor pattern and Celery's distributed task queue architecture:
Primary executor for distributed task execution using Celery. Routes Airflow tasks to Celery workers running across multiple machines, providing horizontal scalability and fault tolerance.
class CeleryExecutor(BaseExecutor):
def start(self): ...
def queue_workload(self, workload: workloads.All, session: Session | None = None): ...
def sync(self): ...
def end(self, synchronous: bool = False): ...
def terminate(self): ...Hybrid executor that routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. Tasks in the 'kubernetes' queue run via KubernetesExecutor, others via CeleryExecutor.
class CeleryKubernetesExecutor(BaseExecutor):
def start(self): ...
def queue_command(self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: str | None = None): ...
def queue_task_instance(self, task_instance: TaskInstance, **kwargs): ...
def sync(self): ...
def end(self): ...Sensor for monitoring Celery queue states, waiting for queues to be empty or checking specific task states.
class CeleryQueueSensor(BaseSensorOperator):
def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs): ...
def poke(self, context: Context) -> bool: ...Command-line tools for managing Celery workers, monitoring with Flower, and queue operations.
def worker(args): ...
def flower(args): ...
def stop_worker(args): ...
def list_workers(args): ...
def shutdown_worker(args): ...
def shutdown_all_workers(args): ...
def add_queue(args): ...
def remove_queue(args): ...The package provides extensive configuration options through Airflow's configuration system:
# Key configuration sections:
# [celery] - Main Celery executor settings
# [celery_kubernetes_executor] - Hybrid executor settings
# [celery_broker_transport_options] - Broker transport configurationfrom typing import Any, Dict, List, Optional, Union
from airflow.models.taskinstance import TaskInstance
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.executors.base_executor import BaseExecutor, CommandType
# Celery-specific types
TaskTuple = tuple[TaskInstanceKey, CommandType, str, Any]
TaskInstanceInCelery = tuple[TaskInstance, CommandType]