0
# Executors
1
2
Run Airflow tasks on Kubernetes infrastructure with executors that manage task distribution, pod creation, and lifecycle management across Kubernetes clusters.
3
4
## Capabilities
5
6
### Kubernetes Executor
7
8
Execute Airflow tasks as Kubernetes pods with comprehensive cluster management, resource allocation, and monitoring.
9
10
```python { .api }
11
class KubernetesExecutor(BaseExecutor):
12
"""
13
Executes tasks in Kubernetes pods.
14
15
This executor launches each task as a separate Kubernetes pod, providing
16
isolation, scalability, and resource management for Airflow task execution.
17
18
Configuration is typically done through airflow.cfg under the
19
[kubernetes_executor] section.
20
21
Key Features:
22
- Task isolation in separate pods
23
- Dynamic resource allocation
24
- Multi-namespace support
25
- Pod template customization
26
- Automatic cleanup and monitoring
27
"""
28
def __init__(self, **kwargs): ...
29
30
def start(self) -> None:
31
"""
32
Start the executor.
33
34
Initializes the Kubernetes client, starts the job watcher process,
35
and prepares the executor for task scheduling.
36
"""
37
...
38
39
def sync(self) -> None:
40
"""
41
Synchronize task states.
42
43
Updates task states based on pod status, processes completed tasks,
44
and handles task state transitions.
45
"""
46
...
47
48
def end(self) -> None:
49
"""
50
End executor.
51
52
Gracefully shuts down the executor, cleans up resources,
53
and terminates background processes.
54
"""
55
...
56
57
def terminate(self) -> None:
58
"""
59
Terminate executor.
60
61
Forcefully terminates the executor and all associated processes.
62
"""
63
...
64
65
def execute_async(
66
self,
67
key: TaskInstanceKey,
68
command: CommandType,
69
queue: str | None = None,
70
executor_config: dict | None = None
71
) -> None:
72
"""
73
Execute task asynchronously.
74
75
Args:
76
key: Task instance key
77
command: Command to execute
78
queue: Execution queue (optional)
79
executor_config: Executor-specific configuration
80
"""
81
...
82
83
def adopt_launched_task(self, ti: TaskInstance, last_heartbeat: datetime, session: Session) -> bool:
84
"""
85
Adopt a launched task.
86
87
Args:
88
ti: Task instance to adopt
89
last_heartbeat: Last heartbeat timestamp
90
session: Database session
91
92
Returns:
93
bool: True if task was successfully adopted
94
"""
95
...
96
```
97
98
### Local Kubernetes Executor
99
100
Hybrid executor that runs local tasks with LocalExecutor and Kubernetes tasks with KubernetesExecutor based on queue configuration.
101
102
```python { .api }
103
class LocalKubernetesExecutor(BaseExecutor):
104
"""
105
Hybrid executor running LocalExecutor for local tasks and
106
KubernetesExecutor for Kubernetes tasks.
107
108
Tasks are routed to the appropriate executor based on the queue name.
109
By default, tasks in the 'kubernetes' queue are executed on Kubernetes,
110
while all other tasks are executed locally.
111
112
Configuration:
113
- kubernetes_queue: Queue name for Kubernetes tasks (default: 'kubernetes')
114
"""
115
def __init__(self, local_executor: BaseExecutor, kubernetes_executor: BaseExecutor, **kwargs): ...
116
117
def start(self) -> None:
118
"""Start both local and Kubernetes executors."""
119
...
120
121
def sync(self) -> None:
122
"""Synchronize both executors."""
123
...
124
125
def end(self) -> None:
126
"""End both executors."""
127
...
128
129
def terminate(self) -> None:
130
"""Terminate both executors."""
131
...
132
133
def execute_async(
134
self,
135
key: TaskInstanceKey,
136
command: CommandType,
137
queue: str | None = None,
138
executor_config: dict | None = None
139
) -> None:
140
"""
141
Route task to appropriate executor based on queue.
142
143
Args:
144
key: Task instance key
145
command: Command to execute
146
queue: Execution queue - determines executor selection
147
executor_config: Executor-specific configuration
148
"""
149
...
150
151
def has_task(self, ti: TaskInstance) -> bool:
152
"""Check if task exists in either executor."""
153
...
154
155
def heartbeat(self) -> None:
156
"""Heartbeat both executors."""
157
...
158
```
159
160
### Kubernetes Job Watcher
161
162
Process that monitors Kubernetes job status and manages job lifecycle for the executor.
163
164
```python { .api }
165
class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
166
"""
167
Watches Kubernetes jobs for state changes.
168
169
This process monitors pods created by the KubernetesExecutor,
170
tracks their status changes, and communicates updates back
171
to the main executor process.
172
173
Args:
174
namespace (str): Kubernetes namespace to watch
175
multi_namespace_mode (bool): Enable multi-namespace watching
176
watcher_queue (Queue): Queue for status updates
177
resource_version (str, optional): Starting resource version
178
worker_uuid (str): Unique identifier for this worker
179
kube_config (Configuration): Kubernetes client configuration
180
"""
181
def __init__(
182
self,
183
namespace: str,
184
multi_namespace_mode: bool,
185
watcher_queue: Queue,
186
resource_version: str | None,
187
worker_uuid: str,
188
kube_config: Configuration,
189
**kwargs
190
): ...
191
192
def run(self) -> None:
193
"""Main process loop for watching Kubernetes jobs."""
194
...
195
196
def _run(self) -> None:
197
"""Internal run method with error handling."""
198
...
199
```
200
201
### Airflow Kubernetes Scheduler
202
203
Scheduler component that manages pod creation and task distribution on Kubernetes clusters.
204
205
```python { .api }
206
class AirflowKubernetesScheduler(LoggingMixin):
207
"""
208
Schedules pods on Kubernetes.
209
210
Manages the creation, monitoring, and cleanup of Kubernetes pods
211
for task execution. Handles pod templates, resource allocation,
212
and namespace management.
213
214
Args:
215
kube_config (Configuration): Kubernetes client configuration
216
task_queue (Queue): Queue of tasks to execute
217
result_queue (Queue): Queue for execution results
218
kube_client (ApiClient): Kubernetes API client
219
worker_uuid (str): Unique identifier for this worker
220
"""
221
def __init__(
222
self,
223
kube_config: Configuration,
224
task_queue: Queue,
225
result_queue: Queue,
226
kube_client: ApiClient,
227
worker_uuid: str,
228
**kwargs
229
): ...
230
231
def run_pod_async(self, pod: V1Pod, **kwargs) -> None:
232
"""Run pod asynchronously."""
233
...
234
235
def delete_pod(self, pod_name: str, namespace: str) -> None:
236
"""Delete a pod."""
237
...
238
239
def patch_pod_executor_done(self, pod_name: str, namespace: str) -> None:
240
"""Mark pod as executor done."""
241
...
242
243
def _make_safe_pod_id(self, safe_dag_id: str, safe_task_id: str, safe_run_id: str) -> str:
244
"""Generate safe pod identifier."""
245
...
246
```
247
248
### Resource Version Manager
249
250
Singleton class that manages Kubernetes resource versions for efficient watching.
251
252
```python { .api }
253
class ResourceVersion(metaclass=Singleton):
254
"""
255
Manages Kubernetes resource versions.
256
257
Tracks resource versions for efficient watching of Kubernetes
258
resources, enabling incremental updates and reducing API load.
259
"""
260
def __init__(self): ...
261
262
def get_resource_version(self) -> str:
263
"""Get current resource version."""
264
...
265
266
def set_resource_version(self, version: str) -> None:
267
"""Set resource version."""
268
...
269
```
270
271
### Utility Functions
272
273
Helper functions for executor operations and pod management.
274
275
```python { .api }
276
def get_base_pod_from_template(
277
pod_template_file: str | None,
278
kube_client: ApiClient
279
) -> V1Pod:
280
"""
281
Get base pod from template.
282
283
Args:
284
pod_template_file: Path to pod template file
285
kube_client: Kubernetes API client
286
287
Returns:
288
V1Pod: Base pod specification
289
"""
290
...
291
```
292
293
## Configuration
294
295
The Kubernetes Executor is configured through the `airflow.cfg` file under the `[kubernetes_executor]` section:
296
297
### Core Configuration
298
299
```ini
300
[kubernetes_executor]
301
# Kubernetes namespace for pods
302
namespace = default
303
304
# Pod template file (optional)
305
pod_template_file = /path/to/pod_template.yaml
306
307
# Worker container image
308
worker_container_repository = apache/airflow
309
worker_container_tag = 2.7.0
310
311
# Delete pods after completion
312
delete_worker_pods = True
313
delete_worker_pods_on_failure = False
314
315
# In-cluster configuration
316
in_cluster = True
317
318
# Cluster context (for out-of-cluster)
319
cluster_context = my-cluster
320
321
# Config file path (for out-of-cluster)
322
config_file = ~/.kube/config
323
```
324
325
### Multi-Namespace Configuration
326
327
```ini
328
# Enable multi-namespace mode
329
multi_namespace_mode = True
330
331
# Specific namespaces (when not using cluster role)
332
multi_namespace_mode_namespace_list = namespace1,namespace2,namespace3
333
```
334
335
### Performance Configuration
336
337
```ini
338
# Number of pods to create per scheduler loop
339
worker_pods_creation_batch_size = 3
340
341
# Task publish retry configuration
342
task_publish_max_retries = 3
343
344
# API client retry configuration
345
api_client_retry_configuration = {"total": 3, "backoff_factor": 0.5}
346
```
347
348
### Resource Management
349
350
```ini
351
# Pod termination grace period
352
termination_grace_period_seconds = 30
353
354
# Delete options for pod cleanup
355
delete_option_kwargs = {"grace_period_seconds": 10}
356
```
357
358
## Usage Examples
359
360
### Basic Executor Configuration
361
362
```python
363
# airflow.cfg configuration for KubernetesExecutor
364
[core]
365
executor = KubernetesExecutor
366
367
[kubernetes_executor]
368
namespace = airflow
369
worker_container_repository = my-registry/airflow
370
worker_container_tag = 2.7.0-custom
371
delete_worker_pods = True
372
in_cluster = True
373
```
374
375
### Pod Template Configuration
376
377
```yaml
378
# pod_template.yaml
379
apiVersion: v1
380
kind: Pod
381
metadata:
382
name: airflow-worker
383
namespace: airflow
384
spec:
385
serviceAccountName: airflow-worker
386
securityContext:
387
runAsUser: 50000
388
runAsGroup: 50000
389
fsGroup: 50000
390
containers:
391
- name: base
392
image: apache/airflow:2.7.0
393
resources:
394
requests:
395
memory: "512Mi"
396
cpu: "500m"
397
limits:
398
memory: "1Gi"
399
cpu: "1000m"
400
env:
401
- name: AIRFLOW__CORE__EXECUTOR
402
value: LocalExecutor
403
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
404
valueFrom:
405
secretKeyRef:
406
name: airflow-secrets
407
key: connection-string
408
volumeMounts:
409
- name: airflow-dags
410
mountPath: /opt/airflow/dags
411
- name: airflow-logs
412
mountPath: /opt/airflow/logs
413
volumes:
414
- name: airflow-dags
415
persistentVolumeClaim:
416
claimName: airflow-dags-pvc
417
- name: airflow-logs
418
persistentVolumeClaim:
419
claimName: airflow-logs-pvc
420
restartPolicy: Never
421
```
422
423
### Local Kubernetes Executor Setup
424
425
```python
426
# airflow.cfg for LocalKubernetesExecutor
427
[core]
428
executor = LocalKubernetesExecutor
429
430
[local_kubernetes_executor]
431
kubernetes_queue = kubernetes
432
433
[kubernetes_executor]
434
namespace = airflow-k8s
435
worker_container_repository = my-registry/airflow
436
delete_worker_pods = True
437
438
[celery]
439
# Local executor falls back to SequentialExecutor for local tasks
440
# Configure if you want CeleryExecutor for local tasks instead
441
```
442
443
### Task with Kubernetes Executor Config
444
445
```python
446
from airflow import DAG
447
from airflow.operators.python import PythonOperator
448
from datetime import datetime
449
450
def my_task():
451
print("Running on Kubernetes!")
452
return "Task completed"
453
454
dag = DAG(
455
'kubernetes_executor_example',
456
start_date=datetime(2023, 1, 1),
457
schedule_interval=None,
458
catchup=False
459
)
460
461
# Task with custom executor config
462
k8s_task = PythonOperator(
463
task_id='kubernetes_task',
464
python_callable=my_task,
465
executor_config={
466
'KubernetesExecutor': {
467
'namespace': 'custom-namespace',
468
'image': 'custom-image:latest',
469
'request_memory': '1Gi',
470
'request_cpu': '500m',
471
'limit_memory': '2Gi',
472
'limit_cpu': '1000m',
473
'labels': {'team': 'data-engineering'},
474
'annotations': {'monitoring': 'enabled'},
475
'env_vars': [
476
{'name': 'CUSTOM_VAR', 'value': 'custom_value'}
477
],
478
'secrets': [
479
{
480
'deploy_type': 'env',
481
'deploy_target': 'DB_PASSWORD',
482
'secret': 'database-secret',
483
'key': 'password'
484
}
485
],
486
'volumes': [
487
{
488
'name': 'data-volume',
489
'persistentVolumeClaim': {
490
'claimName': 'data-pvc'
491
}
492
}
493
],
494
'volume_mounts': [
495
{
496
'name': 'data-volume',
497
'mountPath': '/data'
498
}
499
],
500
'node_selector': {'disktype': 'ssd'},
501
'affinity': {
502
'nodeAffinity': {
503
'requiredDuringSchedulingIgnoredDuringExecution': {
504
'nodeSelectorTerms': [{
505
'matchExpressions': [{
506
'key': 'kubernetes.io/arch',
507
'operator': 'In',
508
'values': ['amd64']
509
}]
510
}]
511
}
512
}
513
},
514
'tolerations': [
515
{
516
'key': 'compute-type',
517
'operator': 'Equal',
518
'value': 'gpu',
519
'effect': 'NoSchedule'
520
}
521
]
522
}
523
},
524
dag=dag
525
)
526
```
527
528
### Queue-Based Executor Routing
529
530
```python
531
# Tasks for LocalKubernetesExecutor with queue routing
532
533
# This task runs locally (default queue)
534
local_task = PythonOperator(
535
task_id='local_task',
536
python_callable=lambda: print("Running locally!"),
537
dag=dag
538
)
539
540
# This task runs on Kubernetes (kubernetes queue)
541
k8s_task = PythonOperator(
542
task_id='k8s_task',
543
python_callable=lambda: print("Running on Kubernetes!"),
544
queue='kubernetes', # Routes to KubernetesExecutor
545
dag=dag
546
)
547
548
# Custom queue name (configure in airflow.cfg)
549
gpu_task = PythonOperator(
550
task_id='gpu_task',
551
python_callable=lambda: print("Running on GPU nodes!"),
552
queue='gpu-queue', # Custom queue for GPU nodes
553
executor_config={
554
'KubernetesExecutor': {
555
'namespace': 'gpu-namespace',
556
'node_selector': {'accelerator': 'nvidia-gpu'},
557
'tolerations': [
558
{
559
'key': 'nvidia.com/gpu',
560
'operator': 'Exists',
561
'effect': 'NoSchedule'
562
}
563
],
564
'resources': {
565
'limits': {
566
'nvidia.com/gpu': '1'
567
}
568
}
569
}
570
},
571
dag=dag
572
)
573
```
574
575
### High Availability Configuration
576
577
```ini
578
# airflow.cfg for HA Kubernetes Executor setup
579
[kubernetes_executor]
580
namespace = airflow-prod
581
582
# Multi-namespace for isolation
583
multi_namespace_mode = True
584
multi_namespace_mode_namespace_list = airflow-prod,airflow-staging
585
586
# High throughput configuration
587
worker_pods_creation_batch_size = 10
588
task_publish_max_retries = 5
589
590
# Pod cleanup configuration
591
delete_worker_pods = True
592
delete_worker_pods_on_failure = False
593
594
# Fatal container states that should fail tasks immediately
595
worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,InvalidImageName
596
597
# Networking configuration
598
enable_tcp_keepalive = True
599
tcp_keep_idle = 120
600
tcp_keep_intvl = 30
601
tcp_keep_cnt = 6
602
603
# SSL configuration for production
604
verify_ssl = True
605
ssl_ca_cert = /etc/ssl/certs/ca-certificates.crt
606
607
# Resource management
608
delete_option_kwargs = {"grace_period_seconds": 30, "propagation_policy": "Foreground"}
609
```
610
611
### Monitoring and Observability
612
613
```python
614
# Task with enhanced monitoring configuration
615
monitored_task = PythonOperator(
616
task_id='monitored_task',
617
python_callable=my_task,
618
executor_config={
619
'KubernetesExecutor': {
620
'labels': {
621
'app': 'airflow',
622
'component': 'worker',
623
'version': '2.7.0',
624
'team': 'data-platform'
625
},
626
'annotations': {
627
'prometheus.io/scrape': 'true',
628
'prometheus.io/port': '8080',
629
'logging.coreos.com/enabled': 'true'
630
}
631
}
632
},
633
dag=dag
634
)
635
```
636
637
### Cleanup and Maintenance
638
639
```python
640
def cleanup_completed_pods(**context):
641
"""Clean up completed pods older than specified age."""
642
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
643
from datetime import datetime, timedelta
644
645
hook = KubernetesHook(conn_id='kubernetes_default')
646
client = hook.get_conn()
647
648
# Get all pods in airflow namespace
649
pods = client.list_namespaced_pod(
650
namespace='airflow',
651
label_selector='airflow-worker=true'
652
)
653
654
cutoff_time = datetime.utcnow() - timedelta(hours=24)
655
656
for pod in pods.items:
657
# Check if pod is completed and old
658
if (pod.status.phase in ['Succeeded', 'Failed'] and
659
pod.metadata.creation_timestamp < cutoff_time):
660
661
client.delete_namespaced_pod(
662
name=pod.metadata.name,
663
namespace='airflow',
664
grace_period_seconds=0
665
)
666
print(f"Deleted pod: {pod.metadata.name}")
667
668
cleanup_task = PythonOperator(
669
task_id='cleanup_old_pods',
670
python_callable=cleanup_completed_pods,
671
schedule_interval='@daily',
672
dag=dag
673
)
674
```