CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

decorators.mddocs/

Task Decorators

Create Kubernetes tasks using Python decorators with automatic pod configuration and execution. Decorators provide a simplified interface for running Python functions and shell commands in Kubernetes pods.

Capabilities

Kubernetes Task Decorator

Execute Python functions in Kubernetes pods with automatic serialization, pod management, and result handling.

def kubernetes_task(
    image: str,
    namespace: str = "default",
    name: str | None = None,
    random_name_suffix: bool = True,
    reattach_on_restart: bool = True,
    startup_timeout_seconds: int = 120,
    get_logs: bool = True,
    image_pull_policy: str = "IfNotPresent",
    cmds: list[str] | None = None,
    arguments: list[str] | None = None,
    ports: list | None = None,
    volume_mounts: list | None = None,
    volumes: list | None = None,
    env_vars: list | None = None,
    secrets: list | None = None,
    configmaps: list[str] | None = None,
    labels: dict[str, str] | None = None,
    node_selector: dict[str, str] | None = None,
    affinity: dict | None = None,
    tolerations: list | None = None,
    security_context: dict | None = None,
    container_resources: dict | None = None,
    service_account_name: str | None = None,
    is_delete_operator_pod: bool = True,
    hostnetwork: bool = False,
    pod_template_file: str | None = None,
    pod_template_dict: dict | None = None,
    full_pod_spec: dict | None = None,
    init_containers: list | None = None,
    sidecars: list | None = None,
    cluster_context: str | None = None,
    config_file: str | None = None,
    in_cluster: bool | None = None,
    conn_id: str = "kubernetes_default",
    do_xcom_push: bool = True,
    task_id: str | None = None,
    **kwargs
):
    """
    Decorator to create Kubernetes task from Python function.
    
    This decorator converts a Python function into a KubernetesPodOperator task
    that executes the function inside a Kubernetes pod.
    
    Args:
        image (str): Docker image to use for the pod
        namespace (str): Kubernetes namespace. Defaults to 'default'
        name (str, optional): Name of the pod. Auto-generated if not provided
        random_name_suffix (bool): Add random suffix to pod name. Default: True
        reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
        startup_timeout_seconds (int): Pod startup timeout. Default: 120
        get_logs (bool): Retrieve pod logs. Default: True
        image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
        cmds (list[str], optional): Container command override
        arguments (list[str], optional): Container arguments override
        ports (list, optional): Container ports to expose
        volume_mounts (list, optional): Volume mounts for the pod
        volumes (list, optional): Volumes to attach to the pod
        env_vars (list, optional): Environment variables
        secrets (list, optional): Kubernetes secrets to mount
        configmaps (list[str], optional): ConfigMaps to mount
        labels (dict[str, str], optional): Pod labels  
        node_selector (dict[str, str], optional): Node selection constraints
        affinity (dict, optional): Pod affinity rules
        tolerations (list, optional): Pod tolerations
        security_context (dict, optional): Security context
        container_resources (dict, optional): Resource limits and requests
        service_account_name (str, optional): Service account name
        is_delete_operator_pod (bool): Delete pod after execution. Default: True
        hostnetwork (bool): Use host networking. Default: False
        pod_template_file (str, optional): Path to pod template file
        pod_template_dict (dict, optional): Pod template as dictionary
        full_pod_spec (dict, optional): Complete pod specification
        init_containers (list, optional): Init containers for the pod
        sidecars (list, optional): Sidecar containers
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
        do_xcom_push (bool): Push return value to XCom. Default: True
        task_id (str, optional): Task ID override
        **kwargs: Additional arguments passed to the operator
        
    Returns:
        Decorated function that creates a KubernetesPodOperator task
    """
    ...

Kubernetes Command Task Decorator

Execute shell commands in Kubernetes pods with simplified command specification and output handling.

def kubernetes_cmd_task(
    image: str,
    cmds: list[str],
    namespace: str = "default",
    name: str | None = None,
    random_name_suffix: bool = True,
    reattach_on_restart: bool = True,
    startup_timeout_seconds: int = 120,
    get_logs: bool = True,
    image_pull_policy: str = "IfNotPresent",
    arguments: list[str] | None = None,
    ports: list | None = None,
    volume_mounts: list | None = None,
    volumes: list | None = None,
    env_vars: list | None = None,
    secrets: list | None = None,
    configmaps: list[str] | None = None,
    labels: dict[str, str] | None = None,
    node_selector: dict[str, str] | None = None,
    affinity: dict | None = None,
    tolerations: list | None = None,
    security_context: dict | None = None,
    container_resources: dict | None = None,
    service_account_name: str | None = None,
    is_delete_operator_pod: bool = True,
    hostnetwork: bool = False,
    pod_template_file: str | None = None,
    pod_template_dict: dict | None = None,
    full_pod_spec: dict | None = None,
    init_containers: list | None = None,
    sidecars: list | None = None,
    cluster_context: str | None = None,
    config_file: str | None = None,
    in_cluster: bool | None = None,
    conn_id: str = "kubernetes_default",
    do_xcom_push: bool = True,
    task_id: str | None = None,
    **kwargs
):
    """
    Decorator to create Kubernetes command task.
    
    This decorator creates a KubernetesPodOperator task that executes
    the specified shell commands inside a Kubernetes pod.
    
    Args:
        image (str): Docker image to use for the pod
        cmds (list[str]): Shell commands to execute
        namespace (str): Kubernetes namespace. Defaults to 'default'
        name (str, optional): Name of the pod. Auto-generated if not provided
        random_name_suffix (bool): Add random suffix to pod name. Default: True
        reattach_on_restart (bool): Reattach to existing pod on restart. Default: True
        startup_timeout_seconds (int): Pod startup timeout. Default: 120
        get_logs (bool): Retrieve pod logs. Default: True
        image_pull_policy (str): Image pull policy. Default: 'IfNotPresent'
        arguments (list[str], optional): Arguments for the commands
        ports (list, optional): Container ports to expose
        volume_mounts (list, optional): Volume mounts for the pod
        volumes (list, optional): Volumes to attach to the pod
        env_vars (list, optional): Environment variables
        secrets (list, optional): Kubernetes secrets to mount
        configmaps (list[str], optional): ConfigMaps to mount
        labels (dict[str, str], optional): Pod labels
        node_selector (dict[str, str], optional): Node selection constraints
        affinity (dict, optional): Pod affinity rules
        tolerations (list, optional): Pod tolerations
        security_context (dict, optional): Security context
        container_resources (dict, optional): Resource limits and requests
        service_account_name (str, optional): Service account name
        is_delete_operator_pod (bool): Delete pod after execution. Default: True
        hostnetwork (bool): Use host networking. Default: False
        pod_template_file (str, optional): Path to pod template file
        pod_template_dict (dict, optional): Pod template as dictionary
        full_pod_spec (dict, optional): Complete pod specification
        init_containers (list, optional): Init containers for the pod
        sidecars (list, optional): Sidecar containers
        cluster_context (str, optional): Kubernetes cluster context
        config_file (str, optional): Path to kubeconfig file
        in_cluster (bool, optional): Use in-cluster configuration
        conn_id (str): Kubernetes connection ID. Default: 'kubernetes_default'
        do_xcom_push (bool): Push return value to XCom. Default: True
        task_id (str, optional): Task ID override
        **kwargs: Additional arguments passed to the operator
        
    Returns:
        Decorated function that creates a KubernetesPodOperator task
    """
    ...

Internal Decorated Operators

Internal operator classes used by the decorators (not typically used directly).

class _KubernetesDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
    """Internal decorated operator for Kubernetes tasks."""
    ...

class _KubernetesCmdDecoratedOperator(DecoratedOperator, KubernetesPodOperator):
    """Internal decorated operator for Kubernetes command tasks."""
    ...

Usage Examples

Basic Python Function Execution

from airflow.providers.cncf.kubernetes.decorators.kubernetes import kubernetes_task

@kubernetes_task(
    image='python:3.9-slim',
    namespace='default'
)
def process_data():
    """Simple data processing function."""
    import pandas as pd
    import numpy as np
    
    # Create sample data
    data = pd.DataFrame({
        'values': np.random.randn(1000),
        'categories': np.random.choice(['A', 'B', 'C'], 1000)
    })
    
    # Process data
    result = data.groupby('categories')['values'].mean().to_dict()
    print(f"Processing complete: {result}")
    
    return result

# Use in DAG
result = process_data()

Function with Dependencies and Packages

@kubernetes_task(
    image='python:3.9',
    namespace='data-processing',
    env_vars=[
        {'name': 'PYTHONPATH', 'value': '/opt/app'},
        {'name': 'DATA_SOURCE', 'value': 'production'}
    ]
)
def analyze_data():
    """Data analysis with external libraries."""
    # Install packages at runtime
    import subprocess
    import sys
    
    subprocess.check_call([
        sys.executable, '-m', 'pip', 'install', 
        'scikit-learn==1.3.0', 'matplotlib==3.7.2'
    ])
    
    # Now use the packages
    from sklearn.datasets import make_classification
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    
    # Generate sample data
    X, y = make_classification(n_samples=1000, n_features=20, random_state=42)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_train, y_train)
    
    # Evaluate
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    
    print(f"Model accuracy: {accuracy:.4f}")
    return {'accuracy': accuracy, 'n_samples': len(X)}

analysis_result = analyze_data()

Function with Volume Mounts

from kubernetes.client import V1Volume, V1VolumeMount, V1PersistentVolumeClaimVolumeSource

@kubernetes_task(
    image='python:3.9-slim',
    namespace='default',
    volumes=[
        V1Volume(
            name='data-volume',
            persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
                claim_name='shared-data-pvc'
            )
        )
    ],
    volume_mounts=[
        V1VolumeMount(
            name='data-volume',
            mount_path='/data'
        )
    ]
)
def process_files():
    """Process files from mounted volume."""
    import os
    import json
    
    data_dir = '/data'
    results = []
    
    # Process all JSON files in the data directory
    for filename in os.listdir(data_dir):
        if filename.endswith('.json'):
            filepath = os.path.join(data_dir, filename)
            with open(filepath, 'r') as f:
                data = json.load(f)
                results.append({
                    'file': filename,
                    'record_count': len(data) if isinstance(data, list) else 1
                })
    
    # Write results
    with open('/data/processing_results.json', 'w') as f:
        json.dump(results, f, indent=2)
    
    return {'processed_files': len(results)}

file_processing = process_files()

Function with Secrets and ConfigMaps

from airflow.providers.cncf.kubernetes.secret import Secret
from kubernetes.client import V1EnvVar

@kubernetes_task(
    image='python:3.9-slim',
    namespace='default',
    secrets=[
        Secret('env', 'DB_PASSWORD', 'database-secret', 'password'),
        Secret('env', 'API_KEY', 'api-secret', 'key')
    ],
    env_vars=[
        V1EnvVar(name='DB_HOST', value='postgresql.default.svc.cluster.local'),
        V1EnvVar(name='DB_NAME', value='analytics')
    ],
    configmaps=['app-config']
)
def database_operation():
    """Perform database operations with secrets."""
    import os
    import subprocess
    import sys
    
    # Install database client
    subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'psycopg2-binary'])
    
    import psycopg2
    
    # Get credentials from environment (injected from secrets)
    db_host = os.environ['DB_HOST']
    db_name = os.environ['DB_NAME']
    db_password = os.environ['DB_PASSWORD']
    
    # Connect and query
    conn = psycopg2.connect(
        host=db_host,
        database=db_name,
        user='analytics_user',
        password=db_password
    )
    
    cursor = conn.cursor()
    cursor.execute("SELECT COUNT(*) FROM user_events WHERE created_at >= NOW() - INTERVAL '1 day'")
    daily_events = cursor.fetchone()[0]
    
    cursor.close()
    conn.close()
    
    return {'daily_events': daily_events}

db_task = database_operation()

Command Task Examples

from airflow.providers.cncf.kubernetes.decorators.kubernetes_cmd import kubernetes_cmd_task

@kubernetes_cmd_task(
    image='ubuntu:20.04',
    cmds=['bash', '-c'],
    arguments=['echo "Starting data backup" && tar -czf /backup/data-$(date +%Y%m%d).tar.gz /data'],
    namespace='backups'
)
def backup_data():
    """Simple backup command."""
    pass

backup_task = backup_data()

Advanced Command Task with Multiple Steps

@kubernetes_cmd_task(
    image='alpine:latest',
    cmds=['sh', '-c'],
    arguments=['''
        set -e
        echo "Installing dependencies..."
        apk add --no-cache curl jq
        
        echo "Downloading data..."
        curl -o /tmp/data.json "https://api.example.com/data"
        
        echo "Processing data..."
        cat /tmp/data.json | jq '.results | length'
        
        echo "Uploading results..."
        curl -X POST -H "Content-Type: application/json" \\
             -d @/tmp/data.json \\
             "https://webhook.example.com/processed"
        
        echo "Process completed successfully"
    '''],
    namespace='default',
    env_vars=[
        {'name': 'API_ENDPOINT', 'value': 'https://api.example.com'},
        {'name': 'WEBHOOK_URL', 'value': 'https://webhook.example.com'}
    ]
)
def api_data_processor():
    """Multi-step API data processing."""
    pass

api_task = api_data_processor()

Task with Resource Limits

@kubernetes_task(
    image='python:3.9',
    namespace='resource-limited',
    container_resources={
        'requests': {
            'cpu': '100m',
            'memory': '256Mi'
        },
        'limits': {
            'cpu': '500m',
            'memory': '1Gi'
        }
    }
)
def resource_intensive_task():
    """Task with specific resource requirements."""
    import time
    import numpy as np
    
    # Simulate CPU-intensive work
    large_array = np.random.randn(10000, 1000)
    result = np.linalg.svd(large_array)
    
    print(f"SVD computation completed. Shape: {result[0].shape}")
    
    # Simulate some processing time
    time.sleep(10)
    
    return {'status': 'completed', 'array_size': large_array.shape}

intensive_task = resource_intensive_task()

Task with Node Selection

@kubernetes_task(
    image='tensorflow/tensorflow:2.13.0-gpu',
    namespace='ml-training',
    node_selector={'accelerator': 'nvidia-gpu'},
    tolerations=[
        {
            'key': 'nvidia.com/gpu',
            'operator': 'Exists',
            'effect': 'NoSchedule'
        }
    ],
    container_resources={
        'limits': {
            'nvidia.com/gpu': '1'
        }
    }
)
def gpu_training_task():
    """Machine learning task requiring GPU."""
    import tensorflow as tf
    
    # Check GPU availability
    gpus = tf.config.experimental.list_physical_devices('GPU')
    print(f"Available GPUs: {len(gpus)}")
    
    if gpus:
        # Simple GPU computation
        with tf.device('/GPU:0'):
            matrix_a = tf.random.normal([1000, 1000])
            matrix_b = tf.random.normal([1000, 1000])
            result = tf.matmul(matrix_a, matrix_b)
            
        print(f"GPU computation completed. Result shape: {result.shape}")
        return {'gpu_used': True, 'result_shape': str(result.shape)}
    else:
        print("No GPU available, using CPU")
        return {'gpu_used': False}

gpu_task = gpu_training_task()

DAG with Multiple Decorated Tasks

from airflow import DAG
from datetime import datetime

dag = DAG(
    'kubernetes_decorated_workflow',
    start_date=datetime(2023, 1, 1),
    schedule_interval='@daily',
    catchup=False
)

@kubernetes_task(
    image='python:3.9-slim',
    namespace='data-pipeline',
    dag=dag
)
def extract_data():
    """Extract data from source."""
    import random
    import json
    
    # Simulate data extraction
    data = [
        {'id': i, 'value': random.randint(1, 100), 'category': random.choice(['A', 'B', 'C'])}
        for i in range(1000)
    ]
    
    print(f"Extracted {len(data)} records")
    return data

@kubernetes_task(
    image='python:3.9-slim',
    namespace='data-pipeline',
    dag=dag
)
def transform_data(raw_data):
    """Transform extracted data."""
    import statistics
    
    # Group by category and calculate statistics
    categories = {}
    for record in raw_data:
        cat = record['category']
        if cat not in categories:
            categories[cat] = []
        categories[cat].append(record['value'])
    
    # Calculate statistics for each category
    stats = {}
    for cat, values in categories.items():
        stats[cat] = {
            'count': len(values),
            'mean': statistics.mean(values),
            'median': statistics.median(values),
            'min': min(values),
            'max': max(values)
        }
    
    print(f"Transformed data for {len(stats)} categories")
    return stats

@kubernetes_cmd_task(
    image='alpine:latest',
    cmds=['sh', '-c'],
    arguments=['echo "Loading data..." && sleep 5 && echo "Data loaded successfully"'],
    namespace='data-pipeline',
    dag=dag
)
def load_data():
    """Load processed data."""
    pass

# Set up task dependencies
raw_data = extract_data()
processed_data = transform_data(raw_data)
load_task = load_data()

# Dependencies
raw_data >> processed_data >> load_task

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json