0
# Task Decorators
1
2
Create Kubernetes tasks using Python decorators with automatic pod configuration and execution. Decorators provide a simplified interface for running Python functions and shell commands in Kubernetes pods.
3
4
## Capabilities
5
6
### Kubernetes Task Decorator
7
8
Execute Python functions in Kubernetes pods with automatic serialization, pod management, and result handling.
9
10
```python { .api }
11
def kubernetes_task(
12
image: str,
13
namespace: str = "default",
14
name: str | None = None,
15
random_name_suffix: bool = True,
16
reattach_on_restart: bool = True,
17
startup_timeout_seconds: int = 120,
18
get_logs: bool = True,
19
image_pull_policy: str = "IfNotPresent",
20
cmds: list[str] | None = None,
21
arguments: list[str] | None = None,
22
ports: list | None = None,
23
volume_mounts: list | None = None,
24
volumes: list | None = None,
25
env_vars: list | None = None,
26
secrets: list | None = None,
27
configmaps: list[str] | None = None,
28
labels: dict[str, str] | None = None,
29
node_selector: dict[str, str] | None = None,
30
affinity: dict | None = None,
31
tolerations: list | None = None,
32
security_context: dict | None = None,
33
container_resources: dict | None = None,
34
service_account_name: str | None = None,
35
is_delete_operator_pod: bool = True,
36
hostnetwork: bool = False,
37
pod_template_file: str | None = None,
38
pod_template_dict: dict | None = None,
39
full_pod_spec: dict | None = None,
40
init_containers: list | None = None,
41
sidecars: list | None = None,
42
cluster_context: str | None = None,
43
config_file: str | None = None,
44
in_cluster: bool | None = None,
45
conn_id: str = "kubernetes_default",
46
do_xcom_push: bool = True,
47
task_id: str | None = None,
48
**kwargs
49
):
50
"""
51
Decorator to create Kubernetes task from Python function.
52
53
This decorator converts a Python function into a KubernetesPodOperator task
54
that executes the function inside a Kubernetes pod.
55
56
Args:
57
image (str): Docker image to use for the pod
58
namespace (str): Kubernetes namespace. Defaults to 'default'
59
name (str, optional): Name of the pod. Auto-generated if not provided
60
random_name_suffix (bool): Add random suffix to pod name. Default: True
61
reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
62
startup_timeout_seconds (int): Pod startup timeout. Default: 120
63
get_logs (bool): Retrieve pod logs. Default: True
64
image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
65
cmds (list[str], optional): Container command override
66
arguments (list[str], optional): Container arguments override
67
ports (list, optional): Container ports to expose
68
volume_mounts (list, optional): Volume mounts for the pod
69
volumes (list, optional): Volumes to attach to the pod
70
env_vars (list, optional): Environment variables
71
secrets (list, optional): Kubernetes secrets to mount
72
configmaps (list[str], optional): ConfigMaps to mount
73
labels (dict[str, str], optional): Pod labels
74
node_selector (dict[str, str], optional): Node selection constraints
75
affinity (dict, optional): Pod affinity rules
76
tolerations (list, optional): Pod tolerations
77
security_context (dict, optional): Security context
78
container_resources (dict, optional): Resource limits and requests
79
service_account_name (str, optional): Service account name
80
is_delete_operator_pod (bool): Delete pod after execution. Default: True
81
hostnetwork (bool): Use host networking. Default: False
82
pod_template_file (str, optional): Path to pod template file
83
pod_template_dict (dict, optional): Pod template as dictionary
84
full_pod_spec (dict, optional): Complete pod specification
85
init_containers (list, optional): Init containers for the pod
86
sidecars (list, optional): Sidecar containers
87
cluster_context (str, optional): Kubernetes cluster context
88
config_file (str, optional): Path to kubeconfig file
89
in_cluster (bool, optional): Use in-cluster configuration
90
conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
91
do_xcom_push (bool): Push return value to XCom. Default: True
92
task_id (str, optional): Task ID override
93
**kwargs: Additional arguments passed to the operator
94
95
Returns:
96
Decorated function that creates a KubernetesPodOperator task
97
"""
98
...
99
```
100
101
### Kubernetes Command Task Decorator
102
103
Execute shell commands in Kubernetes pods with simplified command specification and output handling.
104
105
```python { .api }
106
def kubernetes_cmd_task(
107
image: str,
108
cmds: list[str],
109
namespace: str = "default",
110
name: str | None = None,
111
random_name_suffix: bool = True,
112
reattach_on_restart: bool = True,
113
startup_timeout_seconds: int = 120,
114
get_logs: bool = True,
115
image_pull_policy: str = "IfNotPresent",
116
arguments: list[str] | None = None,
117
ports: list | None = None,
118
volume_mounts: list | None = None,
119
volumes: list | None = None,
120
env_vars: list | None = None,
121
secrets: list | None = None,
122
configmaps: list[str] | None = None,
123
labels: dict[str, str] | None = None,
124
node_selector: dict[str, str] | None = None,
125
affinity: dict | None = None,
126
tolerations: list | None = None,
127
security_context: dict | None = None,
128
container_resources: dict | None = None,
129
service_account_name: str | None = None,
130
is_delete_operator_pod: bool = True,
131
hostnetwork: bool = False,
132
pod_template_file: str | None = None,
133
pod_template_dict: dict | None = None,
134
full_pod_spec: dict | None = None,
135
init_containers: list | None = None,
136
sidecars: list | None = None,
137
cluster_context: str | None = None,
138
config_file: str | None = None,
139
in_cluster: bool | None = None,
140
conn_id: str = "kubernetes_default",
141
do_xcom_push: bool = True,
142
task_id: str | None = None,
143
**kwargs
144
):
145
"""
146
Decorator to create Kubernetes command task.
147
148
This decorator creates a KubernetesPodOperator task that executes
149
the specified shell commands inside a Kubernetes pod.
150
151
Args:
152
image (str): Docker image to use for the pod
153
cmds (list[str]): Shell commands to execute
154
namespace (str): Kubernetes namespace. Defaults to 'default'
155
name (str, optional): Name of the pod. Auto-generated if not provided
156
random_name_suffix (bool): Add random suffix to pod name. Default: True
157
reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
158
startup_timeout_seconds (int): Pod startup timeout. Default: 120
159
get_logs (bool): Retrieve pod logs. Default: True
160
image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
161
arguments (list[str], optional): Arguments for the commands
162
ports (list, optional): Container ports to expose
163
volume_mounts (list, optional): Volume mounts for the pod
164
volumes (list, optional): Volumes to attach to the pod
165
env_vars (list, optional): Environment variables
166
secrets (list, optional): Kubernetes secrets to mount
167
configmaps (list[str], optional): ConfigMaps to mount
168
labels (dict[str, str], optional): Pod labels
169
node_selector (dict[str, str], optional): Node selection constraints
170
affinity (dict, optional): Pod affinity rules
171
tolerations (list, optional): Pod tolerations
172
security_context (dict, optional): Security context
173
container_resources (dict, optional): Resource limits and requests
174
service_account_name (str, optional): Service account name
175
is_delete_operator_pod (bool): Delete pod after execution. Default: True
176
hostnetwork (bool): Use host networking. Default: False
177
pod_template_file (str, optional): Path to pod template file
178
pod_template_dict (dict, optional): Pod template as dictionary
179
full_pod_spec (dict, optional): Complete pod specification
180
init_containers (list, optional): Init containers for the pod
181
sidecars (list, optional): Sidecar containers
182
cluster_context (str, optional): Kubernetes cluster context
183
config_file (str, optional): Path to kubeconfig file
184
in_cluster (bool, optional): Use in-cluster configuration
185
conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
186
do_xcom_push (bool): Push return value to XCom. Default: True
187
task_id (str, optional): Task ID override
188
**kwargs: Additional arguments passed to the operator
189
190
Returns:
191
Decorated function that creates a KubernetesPodOperator task
192
"""
193
...
194
```
195
196
### Internal Decorated Operators
197
198
Internal operator classes used by the decorators (not typically used directly).
199
200
```python { .api }
201
class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
202
"""Internal decorated operator for Kubernetes tasks."""
203
...
204
205
class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
206
"""Internal decorated operator for Kubernetes command tasks."""
207
...
208
```
209
210
## Usage Examples
211
212
### Basic Python Function Execution
213
214
```python
215
from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
216
217
@kubernetes_task(
218
image='python:3.9-slim',
219
namespace='default'
220
)
221
def process_data():
222
"""Simple data processing function."""
223
import pandas as pd
224
import numpy as np
225
226
# Create sample data
227
data = pd.DataFrame({
228
'values': np.random.randn(1000),
229
'categories': np.random.choice(['A', 'B', 'C'], 1000)
230
})
231
232
# Process data
233
result = data.groupby('categories')['values'].mean().to_dict()
234
print(f"Processing complete: {result}")
235
236
return result
237
238
# Use in DAG
239
result = process_data()
240
```
241
242
### Function with Dependencies and Packages
243
244
```python
245
@kubernetes_task(
246
image='python:3.9',
247
namespace='data-processing',
248
env_vars=[
249
{'name': 'PYTHONPATH', 'value': '/opt/app'},
250
{'name': 'DATA_SOURCE', 'value': 'production'}
251
]
252
)
253
def analyze_data():
254
"""Data analysis with external libraries."""
255
# Install packages at runtime
256
import subprocess
257
import sys
258
259
subprocess.check_call([
260
sys.executable, '-m', 'pip', 'install',
261
'scikit-learn==1.3.0', 'matplotlib==3.7.2'
262
])
263
264
# Now use the packages
265
from sklearn.datasets import make_classification
266
from sklearn.ensemble import RandomForestClassifier
267
from sklearn.model_selection import train_test_split
268
from sklearn.metrics import accuracy_score
269
270
# Generate sample data
271
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
272
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
273
274
# Train model
275
model = RandomForestClassifier(n_estimators=100, random_state=42)
276
model.fit(X_train, y_train)
277
278
# Evaluate
279
predictions = model.predict(X_test)
280
accuracy = accuracy_score(y_test, predictions)
281
282
print(f"Model accuracy: {accuracy:.4f}")
283
return {'accuracy': accuracy, 'n_samples': len(X)}
284
285
analysis_result = analyze_data()
286
```
287
288
### Function with Volume Mounts
289
290
```python
291
from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource
292
293
@kubernetes_task(
294
image='python:3.9-slim',
295
namespace='default',
296
volumes=[
297
V1Volume(
298
name='data-volume',
299
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
300
claim_name='shared-data-pvc'
301
)
302
)
303
],
304
volume_mounts=[
305
V1VolumeMount(
306
name='data-volume',
307
mount_path='/data'
308
)
309
]
310
)
311
def process_files():
312
"""Process files from mounted volume."""
313
import os
314
import json
315
316
data_dir = '/data'
317
results = []
318
319
# Process all JSON files in the data directory
320
for filename in os.listdir(data_dir):
321
if filename.endswith('.json'):
322
filepath = os.path.join(data_dir, filename)
323
with open(filepath, 'r') as f:
324
data = json.load(f)
325
results.append({
326
'file': filename,
327
'record_count': len(data) if isinstance(data, list) else 1
328
})
329
330
# Write results
331
with open('/data/processing_results.json', 'w') as f:
332
json.dump(results, f, indent=2)
333
334
return {'processed_files': len(results)}
335
336
file_processing = process_files()
337
```
338
339
### Function with Secrets and ConfigMaps
340
341
```python
342
from airflow.providers.cncf.kubernetes.secret import Secret
343
from kubernetes.client import V1EnvVar
344
345
@kubernetes_task(
346
image='python:3.9-slim',
347
namespace='default',
348
secrets=[
349
Secret('env', 'DB_PASSWORD', 'database-secret', 'password'),
350
Secret('env', 'API_KEY', 'api-secret', 'key')
351
],
352
env_vars=[
353
V1EnvVar(name='DB_HOST', value='postgresql.default.svc.cluster.local'),
354
V1EnvVar(name='DB_NAME', value='analytics')
355
],
356
configmaps=['app-config']
357
)
358
def database_operation():
359
"""Perform database operations with secrets."""
360
import os
361
import subprocess
362
import sys
363
364
# Install database client
365
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'psycopg2-binary'])
366
367
import psycopg2
368
369
# Get credentials from environment (injected from secrets)
370
db_host = os.environ['DB_HOST']
371
db_name = os.environ['DB_NAME']
372
db_password = os.environ['DB_PASSWORD']
373
374
# Connect and query
375
conn = psycopg2.connect(
376
host=db_host,
377
database=db_name,
378
user='analytics_user',
379
password=db_password
380
)
381
382
cursor = conn.cursor()
383
cursor.execute("SELECT COUNT(*) FROM user_events WHERE created_at >= NOW() - INTERVAL '1 day'")
384
daily_events = cursor.fetchone()[0]
385
386
cursor.close()
387
conn.close()
388
389
return {'daily_events': daily_events}
390
391
db_task = database_operation()
392
```
393
394
### Command Task Examples
395
396
```python
397
from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task
398
399
@kubernetes_cmd_task(
400
image='ubuntu:20.04',
401
cmds=['bash', '-c'],
402
arguments=['echo "Starting data backup" && tar -czf /backup/data-$(date +%Y%m%d).tar.gz /data'],
403
namespace='backups'
404
)
405
def backup_data():
406
"""Simple backup command."""
407
pass
408
409
backup_task = backup_data()
410
```
411
412
### Advanced Command Task with Multiple Steps
413
414
```python
415
@kubernetes_cmd_task(
416
image='alpine:latest',
417
cmds=['sh', '-c'],
418
arguments=['''
419
set -e
420
echo "Installing dependencies..."
421
apk add --no-cache curl jq
422
423
echo "Downloading data..."
424
curl -o /tmp/data.json "https://api.example.com/data"
425
426
echo "Processing data..."
427
cat /tmp/data.json | jq '.results | length'
428
429
echo "Uploading results..."
430
curl -X POST -H "Content-Type: application/json" \\
431
-d @/tmp/data.json \\
432
"https://webhook.example.com/processed"
433
434
echo "Process completed successfully"
435
'''],
436
namespace='default',
437
env_vars=[
438
{'name': 'API_ENDPOINT', 'value': 'https://api.example.com'},
439
{'name': 'WEBHOOK_URL', 'value': 'https://webhook.example.com'}
440
]
441
)
442
def api_data_processor():
443
"""Multi-step API data processing."""
444
pass
445
446
api_task = api_data_processor()
447
```
448
449
### Task with Resource Limits
450
451
```python
452
@kubernetes_task(
453
image='python:3.9',
454
namespace='resource-limited',
455
container_resources={
456
'requests': {
457
'cpu': '100m',
458
'memory': '256Mi'
459
},
460
'limits': {
461
'cpu': '500m',
462
'memory': '1Gi'
463
}
464
}
465
)
466
def resource_intensive_task():
467
"""Task with specific resource requirements."""
468
import time
469
import numpy as np
470
471
# Simulate CPU-intensive work
472
large_array = np.random.randn(10000, 1000)
473
result = np.linalg.svd(large_array)
474
475
print(f"SVD computation completed. Shape: {result[0].shape}")
476
477
# Simulate some processing time
478
time.sleep(10)
479
480
return {'status': 'completed', 'array_size': large_array.shape}
481
482
intensive_task = resource_intensive_task()
483
```
484
485
### Task with Node Selection
486
487
```python
488
@kubernetes_task(
489
image='tensorflow/tensorflow:2.13.0-gpu',
490
namespace='ml-training',
491
node_selector={'accelerator': 'nvidia-gpu'},
492
tolerations=[
493
{
494
'key': 'nvidia.com/gpu',
495
'operator': 'Exists',
496
'effect': 'NoSchedule'
497
}
498
],
499
container_resources={
500
'limits': {
501
'nvidia.com/gpu': '1'
502
}
503
}
504
)
505
def gpu_training_task():
506
"""Machine learning task requiring GPU."""
507
import tensorflow as tf
508
509
# Check GPU availability
510
gpus = tf.config.experimental.list_physical_devices('GPU')
511
print(f"Available GPUs: {len(gpus)}")
512
513
if gpus:
514
# Simple GPU computation
515
with tf.device('/GPU:0'):
516
matrix_a = tf.random.normal([1000, 1000])
517
matrix_b = tf.random.normal([1000, 1000])
518
result = tf.matmul(matrix_a, matrix_b)
519
520
print(f"GPU computation completed. Result shape: {result.shape}")
521
return {'gpu_used': True, 'result_shape': str(result.shape)}
522
else:
523
print("No GPU available, using CPU")
524
return {'gpu_used': False}
525
526
gpu_task = gpu_training_task()
527
```
528
529
### DAG with Multiple Decorated Tasks
530
531
```python
532
from airflow import DAG
533
from datetime import datetime
534
535
dag = DAG(
536
'kubernetes_decorated_workflow',
537
start_date=datetime(2023, 1, 1),
538
schedule_interval='@daily',
539
catchup=False
540
)
541
542
@kubernetes_task(
543
image='python:3.9-slim',
544
namespace='data-pipeline',
545
dag=dag
546
)
547
def extract_data():
548
"""Extract data from source."""
549
import random
550
import json
551
552
# Simulate data extraction
553
data = [
554
{'id': i, 'value': random.randint(1, 100), 'category': random.choice(['A', 'B', 'C'])}
555
for i in range(1000)
556
]
557
558
print(f"Extracted {len(data)} records")
559
return data
560
561
@kubernetes_task(
562
image='python:3.9-slim',
563
namespace='data-pipeline',
564
dag=dag
565
)
566
def transform_data(raw_data):
567
"""Transform extracted data."""
568
import statistics
569
570
# Group by category and calculate statistics
571
categories = {}
572
for record in raw_data:
573
cat = record['category']
574
if cat not in categories:
575
categories[cat] = []
576
categories[cat].append(record['value'])
577
578
# Calculate statistics for each category
579
stats = {}
580
for cat, values in categories.items():
581
stats[cat] = {
582
'count': len(values),
583
'mean': statistics.mean(values),
584
'median': statistics.median(values),
585
'min': min(values),
586
'max': max(values)
587
}
588
589
print(f"Transformed data for {len(stats)} categories")
590
return stats
591
592
@kubernetes_cmd_task(
593
image='alpine:latest',
594
cmds=['sh', '-c'],
595
arguments=['echo "Loading data..." && sleep 5 && echo "Data loaded successfully"'],
596
namespace='data-pipeline',
597
dag=dag
598
)
599
def load_data():
600
"""Load processed data."""
601
pass
602
603
# Set up task dependencies
604
raw_data = extract_data()
605
processed_data = transform_data(raw_data)
606
load_task = load_data()
607
608
# Dependencies
609
raw_data >> processed_data >> load_task
610
```