Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
npx @tessl/cli install tessl/pypi-apache-airflow-providers-cncf-kubernetes@10.7.00
# Apache Airflow Providers CNCF Kubernetes
1
2
A comprehensive Kubernetes integration provider for Apache Airflow that enables running workflows and tasks on Kubernetes clusters. This provider offers operators for managing pods and jobs, hooks for API interactions, sensors for monitoring workloads, triggers for asynchronous operations, decorators for task creation, and executors for running Airflow tasks as Kubernetes pods.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-cncf-kubernetes
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-cncf-kubernetes`
9
- **Minimum Airflow Version**: 2.10.0+
10
11
## Core Imports
12
13
```python
14
# Main provider package
15
import airflow.providers.cncf.kubernetes
16
17
# Core operators
18
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
19
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
20
21
# Hooks for API interactions
22
from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook, AsyncKubernetesHook
23
24
# Sensors for monitoring
25
from airflow.providers.cncf.kubernetes.sensors.spark_kubernetes import SparkKubernetesSensor
26
27
# Decorators for task creation
28
from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
29
from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task
30
31
# Executors
32
from airflow.providers.cncf.kubernetes.executors.kubernetes_executor import KubernetesExecutor
33
```
34
35
## Basic Usage
36
37
### Running a Task in a Kubernetes Pod
38
39
```python
40
from datetime import datetime
41
from airflow import DAG
42
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
43
44
# Define the DAG
45
dag = DAG(
46
'kubernetes_pod_example',
47
start_date=datetime(2023, 1, 1),
48
schedule_interval=None,
49
catchup=False
50
)
51
52
# Run a simple task in a Kubernetes pod
53
task = KubernetesPodOperator(
54
task_id='hello_kubernetes',
55
name='hello-pod',
56
namespace='default',
57
image='python:3.9-slim',
58
cmds=['python', '-c'],
59
arguments=['print("Hello from Kubernetes!")'],
60
dag=dag
61
)
62
```
63
64
### Using Kubernetes Task Decorator
65
66
```python
67
from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
68
69
@kubernetes_task(
70
image='python:3.9-slim',
71
namespace='default'
72
)
73
def process_data():
74
import pandas as pd
75
# Task logic here
76
return "Processing complete"
77
78
# Use in DAG
79
result = process_data()
80
```
81
82
### Managing Kubernetes Jobs
83
84
```python
85
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
86
87
job_task = KubernetesJobOperator(
88
task_id='data_processing_job',
89
name='data-job',
90
namespace='default',
91
image='data-processor:latest',
92
cmds=['python', 'process.py'],
93
dag=dag
94
)
95
```
96
97
## Architecture
98
99
The provider is organized around several key components:
100
101
- **Operators**: Execute tasks on Kubernetes (pods, jobs, resources, Spark applications)
102
- **Hooks**: Provide low-level API access to Kubernetes clusters (sync and async)
103
- **Sensors**: Monitor Kubernetes workloads for completion or state changes
104
- **Triggers**: Enable asynchronous monitoring of pods and jobs
105
- **Decorators**: Simplify task creation with Python decorators
106
- **Executors**: Run entire Airflow task execution on Kubernetes infrastructure
107
- **Utilities**: Support pod lifecycle management, resource conversion, and configuration
108
109
## Capabilities
110
111
### Pod Operations
112
113
Execute individual tasks in Kubernetes pods with full lifecycle management, including creation, monitoring, log retrieval, and cleanup.
114
115
```python { .api }
116
class KubernetesPodOperator(BaseOperator):
117
def __init__(
118
self,
119
image: str,
120
name: str | None = None,
121
namespace: str = "default",
122
cmds: list[str] | None = None,
123
arguments: list[str] | None = None,
124
**kwargs
125
): ...
126
127
def execute(self, context: Context) -> Any: ...
128
```
129
130
[Pod Operations](./pod-operations.md)
131
132
### Job Management
133
134
Create, monitor, and manage Kubernetes Jobs with support for batch processing, parallel execution, and job lifecycle operations.
135
136
```python { .api }
137
class KubernetesJobOperator(KubernetesPodOperator):
138
def __init__(
139
self,
140
name: str,
141
image: str,
142
namespace: str = "default",
143
**kwargs
144
): ...
145
```
146
147
[Job Management](./job-management.md)
148
149
### Kubernetes API Integration
150
151
Connect to Kubernetes clusters with comprehensive API access, supporting both synchronous and asynchronous operations.
152
153
```python { .api }
154
class KubernetesHook(BaseHook):
155
def get_conn(self): ...
156
def create_custom_object(self, group: str, version: str, plural: str, body: dict, namespace: str | None = None): ...
157
def get_pod(self, name: str, namespace: str): ...
158
def create_job(self, job: V1Job): ...
159
```
160
161
[API Integration](./api-integration.md)
162
163
### Resource Management
164
165
Create, update, and delete Kubernetes resources from YAML manifests or programmatic definitions.
166
167
```python { .api }
168
class KubernetesCreateResourceOperator(KubernetesResourceBaseOperator):
169
def __init__(
170
self,
171
yaml_conf: str | None = None,
172
custom_resource_definition: dict | None = None,
173
**kwargs
174
): ...
175
```
176
177
[Resource Management](./resource-management.md)
178
179
### Spark on Kubernetes
180
181
Deploy and manage Spark applications on Kubernetes clusters with custom resource definitions and monitoring.
182
183
```python { .api }
184
class SparkKubernetesOperator(KubernetesPodOperator):
185
def __init__(
186
self,
187
application_file: str,
188
namespace: str = "default",
189
**kwargs
190
): ...
191
```
192
193
[Spark Integration](./spark-integration.md)
194
195
### Task Decorators
196
197
Create Kubernetes tasks using Python decorators with automatic pod configuration and execution.
198
199
```python { .api }
200
def kubernetes_task(
201
image: str,
202
namespace: str = "default",
203
name: str | None = None,
204
**kwargs
205
): ...
206
207
def kubernetes_cmd_task(
208
image: str,
209
cmds: list[str],
210
namespace: str = "default",
211
**kwargs
212
): ...
213
```
214
215
[Task Decorators](./decorators.md)
216
217
### Monitoring and Sensors
218
219
Monitor Kubernetes workloads with sensors that check application status and wait for completion conditions.
220
221
```python { .api }
222
class SparkKubernetesSensor(BaseSensorOperator):
223
def __init__(
224
self,
225
application_name: str,
226
namespace: str = "default",
227
**kwargs
228
): ...
229
```
230
231
[Monitoring](./monitoring.md)
232
233
### Executors
234
235
Run Airflow tasks on Kubernetes infrastructure with the KubernetesExecutor and hybrid LocalKubernetesExecutor.
236
237
```python { .api }
238
class KubernetesExecutor(BaseExecutor):
239
def start(self): ...
240
def sync(self): ...
241
def end(self): ...
242
```
243
244
[Executors](./executors.md)
245
246
## Types
247
248
```python { .api }
249
# Pod event enumeration
250
class PodEventType(Enum):
251
"""Type of Events emitted by kubernetes pod."""
252
WARNING = "Warning"
253
NORMAL = "Normal"
254
255
# Container state enumeration
256
class ContainerState(str, Enum):
257
"""
258
Possible container states.
259
260
See https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#pod-phase.
261
"""
262
WAITING = "waiting"
263
RUNNING = "running"
264
TERMINATED = "terminated"
265
FAILED = "failed"
266
UNDEFINED = "undefined"
267
268
# Execution mode for callbacks
269
class ExecutionMode(str, Enum):
270
"""Enum class for execution mode."""
271
SYNC = "sync"
272
ASYNC = "async"
273
274
# Pod phase constants
275
class PodPhase:
276
"""Pod phase constants for lifecycle management."""
277
PENDING = "Pending"
278
RUNNING = "Running"
279
SUCCEEDED = "Succeeded"
280
FAILED = "Failed"
281
UNKNOWN = "Unknown"
282
283
# Actions to take when pod finishes
284
class OnFinishAction(str, Enum):
285
"""Actions to take when pod finishes execution."""
286
DELETE_POD = "delete_pod"
287
KEEP_POD = "keep_pod"
288
```
289
290
## Exception Types
291
292
```python { .api }
293
# Pod operation exceptions
294
class PodMutationHookException(AirflowException):
295
"""Raised when exception happens during Pod Mutation Hook execution."""
296
...
297
298
class PodReconciliationError(AirflowException):
299
"""Raised when an error is encountered while trying to merge pod configs."""
300
...
301
302
class PodReattachFailure(AirflowException):
303
"""When we expect to be able to find a pod but cannot."""
304
...
305
306
class PodCredentialsExpiredFailure(AirflowException):
307
"""When pod fails to refresh credentials."""
308
...
309
310
# Pod manager exceptions
311
class PodLaunchFailedException(AirflowException):
312
"""When pod launching fails in KubernetesPodOperator."""
313
...
314
315
class PodLaunchTimeoutException(AirflowException):
316
"""When pod does not leave the Pending phase within specified timeout."""
317
...
318
319
class PodNotFoundException(AirflowException):
320
"""Expected pod does not exist in kube-api."""
321
...
322
323
# Resource operation exceptions
324
class FailToDeleteError(Exception):
325
"""For handling error if an error occurred when handling a yaml file during deletion of the resource."""
326
...
327
```