0
# Job Management
1
2
Create, monitor, and manage Kubernetes Jobs for batch processing, parallel execution, and complex workload orchestration. Jobs provide reliable execution with automatic retry, parallel processing, and completion tracking.
3
4
## Capabilities
5
6
### Job Execution
7
8
Execute batch workloads as Kubernetes Jobs with support for parallel processing, completion tracking, and automatic cleanup.
9
10
```python { .api }
11
class KubernetesJobOperator(KubernetesPodOperator):
12
"""
13
Executes a Kubernetes Job.
14
15
Args:
16
name (str): Name of the job
17
image (str): Docker image to run
18
namespace (str): Kubernetes namespace. Defaults to 'default'
19
parallelism (int, optional): Number of parallel pods
20
completions (int, optional): Number of successful completions needed
21
backoff_limit (int, optional): Number of retries before marking job failed
22
ttl_seconds_after_finished (int, optional): TTL for cleanup after completion
23
active_deadline_seconds (int, optional): Maximum duration for job execution
24
suspend (bool): Whether to suspend the job. Default: False
25
manual_selector (bool): Manually manage pod selector. Default: False
26
completion_mode (str, optional): Completion mode ('NonIndexed' or 'Indexed')
27
pod_failure_policy (dict, optional): Pod failure policy configuration
28
**kwargs: Additional arguments passed to KubernetesPodOperator
29
"""
30
def __init__(
31
self,
32
name: str,
33
image: str,
34
namespace: str = "default",
35
parallelism: int | None = None,
36
completions: int | None = None,
37
backoff_limit: int | None = None,
38
ttl_seconds_after_finished: int | None = None,
39
active_deadline_seconds: int | None = None,
40
suspend: bool = False,
41
manual_selector: bool = False,
42
completion_mode: str | None = None,
43
pod_failure_policy: dict | None = None,
44
**kwargs
45
): ...
46
47
def execute(self, context: Context) -> Any:
48
"""Execute the job."""
49
...
50
51
def build_job_request_obj(self, context: Context) -> V1Job:
52
"""Build Kubernetes job request object."""
53
...
54
55
def create_job_request_obj(self, context: Context) -> V1Job:
56
"""Create job from pod template."""
57
...
58
```
59
60
### Job Deletion
61
62
Delete existing Kubernetes Jobs with optional cleanup of associated pods and resources.
63
64
```python { .api }
65
class KubernetesDeleteJobOperator(BaseOperator):
66
"""
67
Delete a Kubernetes Job.
68
69
Args:
70
name (str): Name of the job to delete
71
namespace (str): Kubernetes namespace. Defaults to 'default'
72
cluster_context (str, optional): Kubernetes cluster context
73
config_file (str, optional): Path to kubeconfig file
74
in_cluster (bool, optional): Use in-cluster configuration
75
delete_on_success (bool): Delete job on successful completion. Default: True
76
delete_on_failure (bool): Delete job on failure. Default: False
77
propagation_policy (str): Deletion propagation policy. Default: 'Background'
78
"""
79
def __init__(
80
self,
81
name: str,
82
namespace: str = "default",
83
cluster_context: str | None = None,
84
config_file: str | None = None,
85
in_cluster: bool | None = None,
86
delete_on_success: bool = True,
87
delete_on_failure: bool = False,
88
propagation_policy: str = "Background",
89
**kwargs
90
): ...
91
92
def execute(self, context: Context) -> Any:
93
"""Delete the job."""
94
...
95
```
96
97
### Job Patching
98
99
Update existing Kubernetes Jobs with new specifications or configuration changes.
100
101
```python { .api }
102
class KubernetesPatchJobOperator(BaseOperator):
103
"""
104
Patch a Kubernetes Job.
105
106
Args:
107
name (str): Name of the job to patch
108
namespace (str): Kubernetes namespace. Defaults to 'default'
109
body (dict): Patch body as dictionary
110
cluster_context (str, optional): Kubernetes cluster context
111
config_file (str, optional): Path to kubeconfig file
112
in_cluster (bool, optional): Use in-cluster configuration
113
"""
114
def __init__(
115
self,
116
name: str,
117
namespace: str,
118
body: dict,
119
cluster_context: str | None = None,
120
config_file: str | None = None,
121
in_cluster: bool | None = None,
122
**kwargs
123
): ...
124
125
def execute(self, context: Context) -> Any:
126
"""Patch the job."""
127
...
128
```
129
130
### Job Triggers
131
132
Asynchronous monitoring of Kubernetes Job completion status with triggers for deferrable execution.
133
134
```python { .api }
135
class KubernetesJobTrigger(BaseTrigger):
136
"""
137
Trigger for monitoring Kubernetes Job completion.
138
139
Args:
140
job_name (str): Name of the job to monitor
141
job_namespace (str): Namespace of the job
142
cluster_context (str, optional): Kubernetes cluster context
143
config_file (str, optional): Path to kubeconfig file
144
in_cluster (bool): Use in-cluster configuration. Default: True
145
get_logs (bool): Retrieve job logs. Default: True
146
startup_timeout (int): Startup timeout in seconds. Default: 120
147
"""
148
def __init__(
149
self,
150
job_name: str,
151
job_namespace: str,
152
cluster_context: str | None = None,
153
config_file: str | None = None,
154
in_cluster: bool = True,
155
get_logs: bool = True,
156
startup_timeout: int = 120,
157
**kwargs
158
): ...
159
160
async def run(self) -> AsyncIterator[TriggerEvent]:
161
"""Monitor job execution asynchronously."""
162
...
163
```
164
165
## Usage Examples
166
167
### Basic Job Execution
168
169
```python
170
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
171
172
# Simple batch job
173
batch_job = KubernetesJobOperator(
174
task_id='batch_processing',
175
name='data-processor',
176
namespace='default',
177
image='batch-processor:latest',
178
cmds=['python', 'process_batch.py'],
179
completions=1,
180
parallelism=1,
181
dag=dag
182
)
183
```
184
185
### Parallel Job Processing
186
187
```python
188
# Parallel processing job
189
parallel_job = KubernetesJobOperator(
190
task_id='parallel_processing',
191
name='parallel-processor',
192
namespace='default',
193
image='parallel-processor:latest',
194
cmds=['python', 'parallel_process.py'],
195
completions=10, # Need 10 successful completions
196
parallelism=3, # Run 3 pods in parallel
197
backoff_limit=2, # Allow 2 retries per pod
198
dag=dag
199
)
200
```
201
202
### Job with Cleanup Configuration
203
204
```python
205
# Job with automatic cleanup
206
cleanup_job = KubernetesJobOperator(
207
task_id='cleanup_job',
208
name='cleanup-processor',
209
namespace='default',
210
image='cleanup:latest',
211
ttl_seconds_after_finished=3600, # Clean up after 1 hour
212
active_deadline_seconds=1800, # Max 30 minutes execution
213
dag=dag
214
)
215
```
216
217
### Job with Failure Policy
218
219
```python
220
# Job with custom failure handling
221
failure_policy_job = KubernetesJobOperator(
222
task_id='resilient_job',
223
name='resilient-processor',
224
namespace='default',
225
image='resilient:latest',
226
backoff_limit=5,
227
pod_failure_policy={
228
'rules': [
229
{
230
'action': 'FailJob',
231
'onExitCodes': {
232
'containerName': 'main',
233
'operator': 'In',
234
'values': [1, 2, 3]
235
}
236
},
237
{
238
'action': 'Ignore',
239
'onPodConditions': [
240
{
241
'type': 'DisruptionTarget'
242
}
243
]
244
}
245
]
246
},
247
dag=dag
248
)
249
```
250
251
### Job Deletion Task
252
253
```python
254
from airflow.providers.cncf.kubernetes.operators.job import KubernetesDeleteJobOperator
255
256
# Clean up completed job
257
delete_job = KubernetesDeleteJobOperator(
258
task_id='delete_completed_job',
259
name='data-processor',
260
namespace='default',
261
delete_on_success=True,
262
propagation_policy='Foreground', # Wait for pods to be deleted
263
dag=dag
264
)
265
```
266
267
### Job Status Monitoring
268
269
```python
270
from airflow.providers.cncf.kubernetes.operators.job import KubernetesJobOperator
271
272
# Job with monitoring
273
monitored_job = KubernetesJobOperator(
274
task_id='monitored_job',
275
name='monitored-processor',
276
namespace='default',
277
image='processor:latest',
278
deferrable=True, # Use async monitoring
279
get_logs=True,
280
log_events_on_failure=True,
281
dag=dag
282
)
283
```
284
285
### Indexed Job Processing
286
287
```python
288
# Indexed job for array processing
289
indexed_job = KubernetesJobOperator(
290
task_id='indexed_processing',
291
name='indexed-processor',
292
namespace='default',
293
image='array-processor:latest',
294
cmds=['python', 'process_index.py'],
295
completions=10,
296
parallelism=3,
297
completion_mode='Indexed', # Each pod gets a completion index
298
env_vars=[
299
V1EnvVar(
300
name='JOB_COMPLETION_INDEX',
301
value_from=V1EnvVarSource(
302
field_ref=V1ObjectFieldSelector(
303
field_path='metadata.annotations["batch.kubernetes.io/job-completion-index"]'
304
)
305
)
306
)
307
],
308
dag=dag
309
)
310
```
311
312
### Job Patching Example
313
314
```python
315
from airflow.providers.cncf.kubernetes.operators.job import KubernetesPatchJobOperator
316
317
# Suspend a running job
318
patch_job = KubernetesPatchJobOperator(
319
task_id='suspend_job',
320
name='long-running-job',
321
namespace='default',
322
body={
323
'spec': {
324
'suspend': True
325
}
326
},
327
dag=dag
328
)
329
```