0
# Celery Kubernetes Executor
1
2
The CeleryKubernetesExecutor is a hybrid executor that intelligently routes tasks between CeleryExecutor and KubernetesExecutor based on queue names. This allows for flexible task execution strategies within a single Airflow deployment.
3
4
## Capabilities
5
6
### CeleryKubernetesExecutor Class
7
8
Hybrid executor class that combines Celery and Kubernetes execution strategies.
9
10
```python { .api }
11
class CeleryKubernetesExecutor(BaseExecutor):
12
"""
13
Hybrid executor routing tasks between Celery and Kubernetes executors.
14
15
Tasks assigned to the 'kubernetes' queue (configurable) are executed
16
via KubernetesExecutor, while all other tasks use CeleryExecutor.
17
"""
18
19
# Class attributes
20
supports_ad_hoc_ti_run: bool = True
21
supports_pickling: bool = True
22
supports_sentry: bool = False
23
is_local: bool = False
24
is_single_threaded: bool = False
25
is_production: bool = True
26
serve_logs: bool = False
27
callback_sink: BaseCallbackSink | None = None
28
29
def __init__(self, celery_executor: CeleryExecutor | None = None, kubernetes_executor: KubernetesExecutor | None = None):
30
"""
31
Initialize the hybrid executor.
32
33
Parameters:
34
- celery_executor: CeleryExecutor | None, optional Celery executor instance
35
- kubernetes_executor: KubernetesExecutor | None, optional Kubernetes executor instance
36
"""
37
38
@property
39
def kubernetes_queue(self) -> str:
40
"""
41
Get the queue name that routes tasks to KubernetesExecutor.
42
43
Returns:
44
str: Queue name (default: 'kubernetes')
45
"""
46
47
@property
48
def queued_tasks(self) -> dict[TaskInstanceKey, Any]:
49
"""
50
Get combined queued tasks from both executors.
51
52
Returns:
53
dict[TaskInstanceKey, Any]: all queued tasks
54
"""
55
56
@property
57
def running(self) -> set[TaskInstanceKey]:
58
"""
59
Get combined running tasks from both executors.
60
61
Returns:
62
set[TaskInstanceKey]: all running task keys
63
"""
64
65
@property
66
def job_id(self) -> int | str | None:
67
"""
68
Get the job ID for this executor instance.
69
70
Returns:
71
int | str | None: executor job identifier
72
"""
73
74
@property
75
def slots_available(self) -> int:
76
"""
77
Get total available slots across both executors.
78
79
Returns:
80
int: number of available execution slots
81
"""
82
83
@property
84
def slots_occupied(self) -> int:
85
"""
86
Get total occupied slots across both executors.
87
88
Returns:
89
int: number of occupied execution slots
90
"""
91
92
def start(self) -> None:
93
"""
94
Start both underlying executors.
95
96
Initializes and starts both CeleryExecutor and KubernetesExecutor
97
instances for handling different types of tasks.
98
"""
99
100
def queue_command(self, task_instance: TaskInstance, command: CommandType,
101
priority: int = 1, queue: str | None = None) -> None:
102
"""
103
Route task to appropriate executor based on queue name.
104
105
Parameters:
106
- task_instance: TaskInstance to execute
107
- command: CommandType containing the task execution command
108
- priority: int, task priority level
109
- queue: str | None, queue name determining execution strategy
110
111
Routing Logic:
112
- If queue == kubernetes_queue (default: 'kubernetes') -> KubernetesExecutor
113
- All other queues -> CeleryExecutor
114
"""
115
116
def queue_task_instance(self, task_instance: TaskInstance, **kwargs) -> None:
117
"""
118
Queue a task instance to appropriate executor based on routing logic.
119
120
Parameters:
121
- task_instance: TaskInstance to queue
122
- **kwargs: Additional arguments passed to underlying executor
123
"""
124
125
def get_task_log(self, ti: TaskInstance, try_number: int) -> tuple[list[str], list[str]]:
126
"""
127
Retrieve task logs from the appropriate executor.
128
129
Parameters:
130
- ti: TaskInstance to get logs for
131
- try_number: int, task attempt number
132
133
Returns:
134
tuple[list[str], list[str]]: (stdout_lines, stderr_lines)
135
"""
136
137
def has_task(self, task_instance: TaskInstance) -> bool:
138
"""
139
Check if a task instance is tracked by either executor.
140
141
Parameters:
142
- task_instance: TaskInstance to check
143
144
Returns:
145
bool: True if task is tracked by either executor
146
"""
147
148
def heartbeat(self) -> None:
149
"""
150
Perform heartbeat operations on both executors.
151
152
Calls heartbeat() on both underlying executors to maintain
153
health and perform periodic maintenance.
154
"""
155
156
def get_event_buffer(self, dag_ids: list[str] | None = None) -> dict[TaskInstanceKey, EventBufferValueType]:
157
"""
158
Get combined event buffer from both executors.
159
160
Parameters:
161
- dag_ids: list[str] | None, optional DAG IDs to filter events
162
163
Returns:
164
dict[TaskInstanceKey, EventBufferValueType]: combined event buffer
165
"""
166
167
def try_adopt_task_instances(self, tis: Sequence[TaskInstance]) -> Sequence[TaskInstance]:
168
"""
169
Attempt to adopt orphaned task instances across both executors.
170
171
Parameters:
172
- tis: Sequence[TaskInstance], potential orphaned tasks
173
174
Returns:
175
Sequence[TaskInstance]: task instances that could not be adopted
176
"""
177
178
def cleanup_stuck_queued_tasks(self, tis: list[TaskInstance]) -> list[str]:
179
"""
180
Clean up stuck queued tasks on both executors (deprecated).
181
182
Parameters:
183
- tis: list[TaskInstance], tasks to clean up
184
185
Returns:
186
list[str]: task instance keys that were cleaned up
187
"""
188
189
def revoke_task(self, *, ti: TaskInstance) -> None:
190
"""
191
Revoke a running task on the appropriate executor.
192
193
Parameters:
194
- ti: TaskInstance, task instance to revoke
195
196
Routes the revocation to the correct underlying executor.
197
"""
198
199
def end(self) -> None:
200
"""
201
Gracefully shutdown both executors.
202
203
Performs clean shutdown of both CeleryExecutor and
204
KubernetesExecutor instances.
205
"""
206
207
def terminate(self) -> None:
208
"""
209
Force termination of tasks on both executors.
210
211
Immediately terminates tasks running on both Celery workers
212
and Kubernetes pods.
213
"""
214
215
def debug_dump(self) -> None:
216
"""
217
Print debug information from both executors.
218
219
Calls debug_dump() on both underlying executors to provide
220
comprehensive debugging information.
221
"""
222
223
def send_callback(self, request: CallbackRequest) -> None:
224
"""
225
Send callback request to appropriate executor.
226
227
Parameters:
228
- request: CallbackRequest, callback to send
229
230
Routes callback to the executor that handled the original task.
231
"""
232
233
@staticmethod
234
def get_cli_commands() -> list:
235
"""
236
Get CLI commands provided by this executor.
237
238
Returns:
239
list: list of CLI command groups
240
"""
241
```
242
243
### Queue Routing Logic
244
245
The executor determines execution strategy based on queue names:
246
247
```python { .api }
248
# Queue routing is handled internally by the executor
249
# based on the kubernetes_queue configuration property
250
```
251
252
## Configuration
253
254
Configuration for the hybrid executor routing behavior:
255
256
```python { .api }
257
# Configuration section: [celery_kubernetes_executor]
258
259
KUBERNETES_QUEUE = "kubernetes" # Queue name for Kubernetes execution
260
261
# Both underlying executors use their respective configuration sections:
262
# [celery] - for CeleryExecutor settings
263
# [kubernetes_executor] - for KubernetesExecutor settings
264
```
265
266
## Usage Examples
267
268
### Basic Configuration
269
270
```python
271
# In airflow.cfg:
272
[core]
273
executor = airflow.providers.celery.executors.celery_kubernetes_executor.CeleryKubernetesExecutor
274
275
[celery_kubernetes_executor]
276
kubernetes_queue = kubernetes
277
278
[celery]
279
# Standard Celery configuration
280
broker_url = redis://redis:6379/0
281
result_backend = db+postgresql://postgres:airflow@postgres/airflow
282
worker_concurrency = 16
283
284
[kubernetes_executor]
285
# Standard Kubernetes configuration
286
namespace = airflow
287
worker_container_repository = airflow-workers
288
worker_container_tag = latest
289
```
290
291
### Task Queue Assignment
292
293
```python
294
from airflow import DAG
295
from airflow.operators.python import PythonOperator
296
from airflow.operators.bash import BashOperator
297
298
dag = DAG('hybrid_execution', schedule_interval=None)
299
300
# This task runs on Kubernetes (ephemeral pod)
301
k8s_task = PythonOperator(
302
task_id='kubernetes_task',
303
python_callable=lambda: print("Running on Kubernetes"),
304
queue='kubernetes', # Routes to KubernetesExecutor
305
dag=dag
306
)
307
308
# This task runs on Celery workers (persistent workers)
309
celery_task = BashOperator(
310
task_id='celery_task',
311
bash_command='echo "Running on Celery"',
312
queue='default', # Routes to CeleryExecutor
313
dag=dag
314
)
315
316
# Custom queue also routes to Celery
317
high_memory_task = PythonOperator(
318
task_id='high_memory_task',
319
python_callable=lambda: print("High memory processing"),
320
queue='high_memory', # Routes to CeleryExecutor
321
dag=dag
322
)
323
324
k8s_task >> [celery_task, high_memory_task]
325
```
326
327
### Programmatic Usage
328
329
```python
330
from airflow.providers.celery.executors.celery_kubernetes_executor import CeleryKubernetesExecutor
331
from airflow.models.taskinstancekey import TaskInstanceKey
332
333
# Initialize hybrid executor
334
executor = CeleryKubernetesExecutor()
335
executor.start()
336
337
# Task routed to Kubernetes
338
k8s_key = TaskInstanceKey(dag_id="my_dag", task_id="k8s_task",
339
run_id="manual_run", try_number=1)
340
k8s_command = ["python", "-c", "print('K8s task')"]
341
342
executor.execute_async(key=k8s_key, command=k8s_command, queue="kubernetes")
343
344
# Task routed to Celery
345
celery_key = TaskInstanceKey(dag_id="my_dag", task_id="celery_task",
346
run_id="manual_run", try_number=1)
347
celery_command = ["python", "-c", "print('Celery task')"]
348
349
executor.execute_async(key=celery_key, command=celery_command, queue="default")
350
351
# Sync states from both executors
352
executor.sync()
353
354
# Cleanup
355
executor.end()
356
```
357
358
### Custom Queue Configuration
359
360
```python
361
# Custom kubernetes queue name
362
# In airflow.cfg:
363
[celery_kubernetes_executor]
364
kubernetes_queue = special_k8s_queue
365
366
# In DAG:
367
special_k8s_task = PythonOperator(
368
task_id='special_kubernetes_task',
369
python_callable=my_function,
370
queue='special_k8s_queue', # Routes to KubernetesExecutor
371
dag=dag
372
)
373
```
374
375
## Use Cases
376
377
### Resource-Based Task Routing
378
379
```python
380
# Route tasks based on resource requirements
381
382
# CPU-intensive tasks on Kubernetes (auto-scaling)
383
cpu_intensive_task = PythonOperator(
384
task_id='ml_training',
385
python_callable=train_model,
386
queue='kubernetes',
387
executor_config={
388
'pod_override': {
389
'spec': {
390
'containers': [{
391
'name': 'base',
392
'resources': {
393
'requests': {'cpu': '4', 'memory': '8Gi'},
394
'limits': {'cpu': '8', 'memory': '16Gi'}
395
}
396
}]
397
}
398
}
399
},
400
dag=dag
401
)
402
403
# I/O intensive tasks on persistent Celery workers
404
io_intensive_task = PythonOperator(
405
task_id='data_processing',
406
python_callable=process_large_files,
407
queue='io_intensive', # Routes to specialized Celery workers
408
dag=dag
409
)
410
```
411
412
### Environment Isolation
413
414
```python
415
# Isolate tasks with different dependency requirements
416
417
# Task requiring special libraries (isolated K8s pod)
418
special_deps_task = BashOperator(
419
task_id='special_processing',
420
bash_command='python special_script.py',
421
queue='kubernetes',
422
executor_config={
423
'pod_override': {
424
'spec': {
425
'containers': [{
426
'name': 'base',
427
'image': 'my-special-image:latest'
428
}]
429
}
430
}
431
},
432
dag=dag
433
)
434
435
# Standard task on regular Celery workers
436
standard_task = PythonOperator(
437
task_id='standard_processing',
438
python_callable=standard_function,
439
queue='default',
440
dag=dag
441
)
442
```
443
444
## Monitoring and Troubleshooting
445
446
### Task Tracking
447
448
```python
449
# Tasks are tracked separately by each underlying executor
450
# Use Airflow UI to see which executor handled each task
451
452
# Celery tasks show up in Flower monitoring
453
# Kubernetes tasks show up in K8s dashboard/kubectl
454
455
# Log aggregation from both execution environments
456
def get_task_logs(dag_id: str, task_id: str, execution_date: str, try_number: int):
457
"""
458
Retrieve logs from either Celery or Kubernetes based on task queue.
459
460
The hybrid executor automatically determines the correct source
461
for log retrieval based on where the task was executed.
462
"""
463
```
464
465
### Performance Considerations
466
467
```python
468
# Balance between Celery and Kubernetes based on:
469
470
# Celery advantages:
471
# - Persistent workers (faster task startup)
472
# - Better for high-frequency, short-duration tasks
473
# - Shared state and caching between tasks
474
# - Lower resource overhead
475
476
# Kubernetes advantages:
477
# - Resource isolation per task
478
# - Auto-scaling based on workload
479
# - Better for resource-intensive tasks
480
# - Clean environment per execution
481
# - Support for different container images per task
482
```