0
# Apache Airflow Providers Celery
1
2
A provider package that enables distributed task execution using Celery as the task queue for Apache Airflow. This package provides Celery-based executors, monitoring sensors, and CLI commands for running Airflow tasks across multiple worker nodes with horizontal scalability and fault tolerance.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-celery
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-celery`
9
10
## Core Imports
11
12
```python
13
from airflow.providers.celery.executors.celery_executor import CeleryExecutor
14
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
15
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
16
```
17
18
## Basic Usage
19
20
```python
21
# Using CeleryExecutor in airflow.cfg
22
# [core]
23
# executor = airflow.providers.celery.executors.celery_executor.CeleryExecutor
24
25
# Using CeleryQueueSensor in DAGs
26
from airflow import DAG
27
from airflow.providers.celery.sensors.celery_queue import CeleryQueueSensor
28
from datetime import datetime, timedelta
29
30
dag = DAG(
31
'celery_monitoring',
32
default_args={'start_date': datetime(2024, 1, 1)},
33
schedule_interval=timedelta(hours=1),
34
catchup=False
35
)
36
37
# Wait for a specific Celery queue to be empty
38
queue_sensor = CeleryQueueSensor(
39
task_id='wait_for_queue_empty',
40
celery_queue='high_priority',
41
timeout=300,
42
poke_interval=30,
43
dag=dag
44
)
45
46
# CLI usage for starting workers and monitoring
47
# airflow celery worker --concurrency 16
48
# airflow celery flower --port 5555
49
```
50
51
## Architecture
52
53
The package follows Airflow's executor pattern and Celery's distributed task queue architecture:
54
55
- **Executors**: Implement Airflow's BaseExecutor interface to distribute tasks via Celery
56
- **Task Serialization**: Airflow tasks are serialized and sent to Celery workers as jobs
57
- **Result Backend**: Task states and results are stored in a shared backend (database/Redis)
58
- **Broker**: Message queue (Redis/RabbitMQ) handles task distribution between scheduler and workers
59
- **Workers**: Celery worker processes execute Airflow tasks on distributed nodes
60
- **Monitoring**: Flower provides web-based monitoring and administration of Celery clusters
61
62
## Capabilities
63
64
### Celery Executor
65
66
Primary executor for distributed task execution using Celery. Routes Airflow tasks to Celery workers running across multiple machines, providing horizontal scalability and fault tolerance.
67
68
```python { .api }
69
class CeleryExecutor(BaseExecutor):
70
def start(self): ...
71
def queue_workload(self, workload: workloads.All, session: Session | None = None): ...
72
def sync(self): ...
73
def end(self, synchronous: bool = False): ...
74
def terminate(self): ...
75
```
76
77
[Celery Executor](./celery-executor.md)
78
79
### Celery Kubernetes Executor
80
81
Hybrid executor that routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. Tasks in the 'kubernetes' queue run via KubernetesExecutor, others via CeleryExecutor.
82
83
```python { .api }
84
class CeleryKubernetesExecutor(BaseExecutor):
85
def start(self): ...
86
def queue_command(self, task_instance: TaskInstance, command: CommandType, priority: int = 1, queue: str | None = None): ...
87
def queue_task_instance(self, task_instance: TaskInstance, **kwargs): ...
88
def sync(self): ...
89
def end(self): ...
90
```
91
92
[Celery Kubernetes Executor](./celery-kubernetes-executor.md)
93
94
### Queue Monitoring
95
96
Sensor for monitoring Celery queue states, waiting for queues to be empty or checking specific task states.
97
98
```python { .api }
99
class CeleryQueueSensor(BaseSensorOperator):
100
def __init__(self, *, celery_queue: str, target_task_id: str | None = None, **kwargs): ...
101
def poke(self, context: Context) -> bool: ...
102
```
103
104
[Queue Monitoring](./queue-monitoring.md)
105
106
### CLI Commands
107
108
Command-line tools for managing Celery workers, monitoring with Flower, and queue operations.
109
110
```python { .api }
111
def worker(args): ...
112
def flower(args): ...
113
def stop_worker(args): ...
114
def list_workers(args): ...
115
def shutdown_worker(args): ...
116
def shutdown_all_workers(args): ...
117
def add_queue(args): ...
118
def remove_queue(args): ...
119
```
120
121
[CLI Commands](./cli-commands.md)
122
123
## Configuration
124
125
The package provides extensive configuration options through Airflow's configuration system:
126
127
```python
128
# Key configuration sections:
129
# [celery] - Main Celery executor settings
130
# [celery_kubernetes_executor] - Hybrid executor settings
131
# [celery_broker_transport_options] - Broker transport configuration
132
```
133
134
[Configuration](./configuration.md)
135
136
## Types
137
138
```python { .api }
139
from typing import Any, Dict, List, Optional, Union
140
from airflow.models.taskinstance import TaskInstance
141
from airflow.models.taskinstancekey import TaskInstanceKey
142
from airflow.executors.base_executor import BaseExecutor, CommandType
143
144
# Celery-specific types
145
TaskTuple = tuple[TaskInstanceKey, CommandType, str, Any]
146
TaskInstanceInCelery = tuple[TaskInstance, CommandType]
147
```