Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Create Kubernetes tasks using Python decorators with automatic pod configuration and execution. Decorators provide a simplified interface for running Python functions and shell commands in Kubernetes pods.
Execute Python functions in Kubernetes pods with automatic serialization, pod management, and result handling.
def kubernetes_task(
image: str,
namespace: str = "default",
name: str | None = None,
random_name_suffix: bool = True,
reattach_on_restart: bool = True,
startup_timeout_seconds: int = 120,
get_logs: bool = True,
image_pull_policy: str = "IfNotPresent",
cmds: list[str] | None = None,
arguments: list[str] | None = None,
ports: list | None = None,
volume_mounts: list | None = None,
volumes: list | None = None,
env_vars: list | None = None,
secrets: list | None = None,
configmaps: list[str] | None = None,
labels: dict[str, str] | None = None,
node_selector: dict[str, str] | None = None,
affinity: dict | None = None,
tolerations: list | None = None,
security_context: dict | None = None,
container_resources: dict | None = None,
service_account_name: str | None = None,
is_delete_operator_pod: bool = True,
hostnetwork: bool = False,
pod_template_file: str | None = None,
pod_template_dict: dict | None = None,
full_pod_spec: dict | None = None,
init_containers: list | None = None,
sidecars: list | None = None,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
conn_id: str = "kubernetes_default",
do_xcom_push: bool = True,
task_id: str | None = None,
**kwargs
):
"""
Decorator to create Kubernetes task from Python function.
This decorator converts a Python function into a KubernetesPodOperator task
that executes the function inside a Kubernetes pod.
Args:
image (str): Docker image to use for the pod
namespace (str): Kubernetes namespace. Defaults to 'default'
name (str, optional): Name of the pod. Auto-generated if not provided
random_name_suffix (bool): Add random suffix to pod name. Default: True
reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
startup_timeout_seconds (int): Pod startup timeout. Default: 120
get_logs (bool): Retrieve pod logs. Default: True
image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
cmds (list[str], optional): Container command override
arguments (list[str], optional): Container arguments override
ports (list, optional): Container ports to expose
volume_mounts (list, optional): Volume mounts for the pod
volumes (list, optional): Volumes to attach to the pod
env_vars (list, optional): Environment variables
secrets (list, optional): Kubernetes secrets to mount
configmaps (list[str], optional): ConfigMaps to mount
labels (dict[str, str], optional): Pod labels
node_selector (dict[str, str], optional): Node selection constraints
affinity (dict, optional): Pod affinity rules
tolerations (list, optional): Pod tolerations
security_context (dict, optional): Security context
container_resources (dict, optional): Resource limits and requests
service_account_name (str, optional): Service account name
is_delete_operator_pod (bool): Delete pod after execution. Default: True
hostnetwork (bool): Use host networking. Default: False
pod_template_file (str, optional): Path to pod template file
pod_template_dict (dict, optional): Pod template as dictionary
full_pod_spec (dict, optional): Complete pod specification
init_containers (list, optional): Init containers for the pod
sidecars (list, optional): Sidecar containers
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
do_xcom_push (bool): Push return value to XCom. Default: True
task_id (str, optional): Task ID override
**kwargs: Additional arguments passed to the operator
Returns:
Decorated function that creates a KubernetesPodOperator task
"""
...Execute shell commands in Kubernetes pods with simplified command specification and output handling.
def kubernetes_cmd_task(
image: str,
cmds: list[str],
namespace: str = "default",
name: str | None = None,
random_name_suffix: bool = True,
reattach_on_restart: bool = True,
startup_timeout_seconds: int = 120,
get_logs: bool = True,
image_pull_policy: str = "IfNotPresent",
arguments: list[str] | None = None,
ports: list | None = None,
volume_mounts: list | None = None,
volumes: list | None = None,
env_vars: list | None = None,
secrets: list | None = None,
configmaps: list[str] | None = None,
labels: dict[str, str] | None = None,
node_selector: dict[str, str] | None = None,
affinity: dict | None = None,
tolerations: list | None = None,
security_context: dict | None = None,
container_resources: dict | None = None,
service_account_name: str | None = None,
is_delete_operator_pod: bool = True,
hostnetwork: bool = False,
pod_template_file: str | None = None,
pod_template_dict: dict | None = None,
full_pod_spec: dict | None = None,
init_containers: list | None = None,
sidecars: list | None = None,
cluster_context: str | None = None,
config_file: str | None = None,
in_cluster: bool | None = None,
conn_id: str = "kubernetes_default",
do_xcom_push: bool = True,
task_id: str | None = None,
**kwargs
):
"""
Decorator to create Kubernetes command task.
This decorator creates a KubernetesPodOperator task that executes
the specified shell commands inside a Kubernetes pod.
Args:
image (str): Docker image to use for the pod
cmds (list[str]): Shell commands to execute
namespace (str): Kubernetes namespace. Defaults to 'default'
name (str, optional): Name of the pod. Auto-generated if not provided
random_name_suffix (bool): Add random suffix to pod name. Default: True
reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
startup_timeout_seconds (int): Pod startup timeout. Default: 120
get_logs (bool): Retrieve pod logs. Default: True
image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
arguments (list[str], optional): Arguments for the commands
ports (list, optional): Container ports to expose
volume_mounts (list, optional): Volume mounts for the pod
volumes (list, optional): Volumes to attach to the pod
env_vars (list, optional): Environment variables
secrets (list, optional): Kubernetes secrets to mount
configmaps (list[str], optional): ConfigMaps to mount
labels (dict[str, str], optional): Pod labels
node_selector (dict[str, str], optional): Node selection constraints
affinity (dict, optional): Pod affinity rules
tolerations (list, optional): Pod tolerations
security_context (dict, optional): Security context
container_resources (dict, optional): Resource limits and requests
service_account_name (str, optional): Service account name
is_delete_operator_pod (bool): Delete pod after execution. Default: True
hostnetwork (bool): Use host networking. Default: False
pod_template_file (str, optional): Path to pod template file
pod_template_dict (dict, optional): Pod template as dictionary
full_pod_spec (dict, optional): Complete pod specification
init_containers (list, optional): Init containers for the pod
sidecars (list, optional): Sidecar containers
cluster_context (str, optional): Kubernetes cluster context
config_file (str, optional): Path to kubeconfig file
in_cluster (bool, optional): Use in-cluster configuration
conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
do_xcom_push (bool): Push return value to XCom. Default: True
task_id (str, optional): Task ID override
**kwargs: Additional arguments passed to the operator
Returns:
Decorated function that creates a KubernetesPodOperator task
"""
...Internal operator classes used by the decorators (not typically used directly).
class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
"""Internal decorated operator for Kubernetes tasks."""
...
class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
"""Internal decorated operator for Kubernetes command tasks."""
...from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task
@kubernetes_task(
image='python:3.9-slim',
namespace='default'
)
def process_data():
"""Simple data processing function."""
import pandas as pd
import numpy as np
# Create sample data
data = pd.DataFrame({
'values': np.random.randn(1000),
'categories': np.random.choice(['A', 'B', 'C'], 1000)
})
# Process data
result = data.groupby('categories')['values'].mean().to_dict()
print(f"Processing complete: {result}")
return result
# Use in DAG
result = process_data()@kubernetes_task(
image='python:3.9',
namespace='data-processing',
env_vars=[
{'name': 'PYTHONPATH', 'value': '/opt/app'},
{'name': 'DATA_SOURCE', 'value': 'production'}
]
)
def analyze_data():
"""Data analysis with external libraries."""
# Install packages at runtime
import subprocess
import sys
subprocess.check_call([
sys.executable, '-m', 'pip', 'install',
'scikit-learn==1.3.0', 'matplotlib==3.7.2'
])
# Now use the packages
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
# Generate sample data
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_train, y_train)
# Evaluate
predictions = model.predict(X_test)
accuracy = accuracy_score(y_test, predictions)
print(f"Model accuracy: {accuracy:.4f}")
return {'accuracy': accuracy, 'n_samples': len(X)}
analysis_result = analyze_data()from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource
@kubernetes_task(
image='python:3.9-slim',
namespace='default',
volumes=[
V1Volume(
name='data-volume',
persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
claim_name='shared-data-pvc'
)
)
],
volume_mounts=[
V1VolumeMount(
name='data-volume',
mount_path='/data'
)
]
)
def process_files():
"""Process files from mounted volume."""
import os
import json
data_dir = '/data'
results = []
# Process all JSON files in the data directory
for filename in os.listdir(data_dir):
if filename.endswith('.json'):
filepath = os.path.join(data_dir, filename)
with open(filepath, 'r') as f:
data = json.load(f)
results.append({
'file': filename,
'record_count': len(data) if isinstance(data, list) else 1
})
# Write results
with open('/data/processing_results.json', 'w') as f:
json.dump(results, f, indent=2)
return {'processed_files': len(results)}
file_processing = process_files()from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import V1EnvVar
@kubernetes_task(
image='python:3.9-slim',
namespace='default',
secrets=[
Secret('env', 'DB_PASSWORD', 'database-secret', 'password'),
Secret('env', 'API_KEY', 'api-secret', 'key')
],
env_vars=[
V1EnvVar(name='DB_HOST', value='postgresql.default.svc.cluster.local'),
V1EnvVar(name='DB_NAME', value='analytics')
],
configmaps=['app-config']
)
def database_operation():
"""Perform database operations with secrets."""
import os
import subprocess
import sys
# Install database client
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'psycopg2-binary'])
import psycopg2
# Get credentials from environment (injected from secrets)
db_host = os.environ['DB_HOST']
db_name = os.environ['DB_NAME']
db_password = os.environ['DB_PASSWORD']
# Connect and query
conn = psycopg2.connect(
host=db_host,
database=db_name,
user='analytics_user',
password=db_password
)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM user_events WHERE created_at >= NOW() - INTERVAL '1 day'")
daily_events = cursor.fetchone()[0]
cursor.close()
conn.close()
return {'daily_events': daily_events}
db_task = database_operation()from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task
@kubernetes_cmd_task(
image='ubuntu:20.04',
cmds=['bash', '-c'],
arguments=['echo "Starting data backup" && tar -czf /backup/data-$(date +%Y%m%d).tar.gz /data'],
namespace='backups'
)
def backup_data():
"""Simple backup command."""
pass
backup_task = backup_data()@kubernetes_cmd_task(
image='alpine:latest',
cmds=['sh', '-c'],
arguments=['''
set -e
echo "Installing dependencies..."
apk add --no-cache curl jq
echo "Downloading data..."
curl -o /tmp/data.json "https://api.example.com/data"
echo "Processing data..."
cat /tmp/data.json | jq '.results | length'
echo "Uploading results..."
curl -X POST -H "Content-Type: application/json" \\
-d @/tmp/data.json \\
"https://webhook.example.com/processed"
echo "Process completed successfully"
'''],
namespace='default',
env_vars=[
{'name': 'API_ENDPOINT', 'value': 'https://api.example.com'},
{'name': 'WEBHOOK_URL', 'value': 'https://webhook.example.com'}
]
)
def api_data_processor():
"""Multi-step API data processing."""
pass
api_task = api_data_processor()@kubernetes_task(
image='python:3.9',
namespace='resource-limited',
container_resources={
'requests': {
'cpu': '100m',
'memory': '256Mi'
},
'limits': {
'cpu': '500m',
'memory': '1Gi'
}
}
)
def resource_intensive_task():
"""Task with specific resource requirements."""
import time
import numpy as np
# Simulate CPU-intensive work
large_array = np.random.randn(10000, 1000)
result = np.linalg.svd(large_array)
print(f"SVD computation completed. Shape: {result[0].shape}")
# Simulate some processing time
time.sleep(10)
return {'status': 'completed', 'array_size': large_array.shape}
intensive_task = resource_intensive_task()@kubernetes_task(
image='tensorflow/tensorflow:2.13.0-gpu',
namespace='ml-training',
node_selector={'accelerator': 'nvidia-gpu'},
tolerations=[
{
'key': 'nvidia.com/gpu',
'operator': 'Exists',
'effect': 'NoSchedule'
}
],
container_resources={
'limits': {
'nvidia.com/gpu': '1'
}
}
)
def gpu_training_task():
"""Machine learning task requiring GPU."""
import tensorflow as tf
# Check GPU availability
gpus = tf.config.experimental.list_physical_devices('GPU')
print(f"Available GPUs: {len(gpus)}")
if gpus:
# Simple GPU computation
with tf.device('/GPU:0'):
matrix_a = tf.random.normal([1000, 1000])
matrix_b = tf.random.normal([1000, 1000])
result = tf.matmul(matrix_a, matrix_b)
print(f"GPU computation completed. Result shape: {result.shape}")
return {'gpu_used': True, 'result_shape': str(result.shape)}
else:
print("No GPU available, using CPU")
return {'gpu_used': False}
gpu_task = gpu_training_task()from airflow import DAG
from datetime import datetime
dag = DAG(
'kubernetes_decorated_workflow',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
)
@kubernetes_task(
image='python:3.9-slim',
namespace='data-pipeline',
dag=dag
)
def extract_data():
"""Extract data from source."""
import random
import json
# Simulate data extraction
data = [
{'id': i, 'value': random.randint(1, 100), 'category': random.choice(['A', 'B', 'C'])}
for i in range(1000)
]
print(f"Extracted {len(data)} records")
return data
@kubernetes_task(
image='python:3.9-slim',
namespace='data-pipeline',
dag=dag
)
def transform_data(raw_data):
"""Transform extracted data."""
import statistics
# Group by category and calculate statistics
categories = {}
for record in raw_data:
cat = record['category']
if cat not in categories:
categories[cat] = []
categories[cat].append(record['value'])
# Calculate statistics for each category
stats = {}
for cat, values in categories.items():
stats[cat] = {
'count': len(values),
'mean': statistics.mean(values),
'median': statistics.median(values),
'min': min(values),
'max': max(values)
}
print(f"Transformed data for {len(stats)} categories")
return stats
@kubernetes_cmd_task(
image='alpine:latest',
cmds=['sh', '-c'],
arguments=['echo "Loading data..." && sleep 5 && echo "Data loaded successfully"'],
namespace='data-pipeline',
dag=dag
)
def load_data():
"""Load processed data."""
pass
# Set up task dependencies
raw_data = extract_data()
processed_data = transform_data(raw_data)
load_task = load_data()
# Dependencies
raw_data >> processed_data >> load_taskInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes