CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-cncf-kubernetes

Comprehensive Kubernetes integration for Apache Airflow workflow orchestration and task execution

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

executors.mddocs/

Executors

Run Airflow tasks on Kubernetes infrastructure with executors that manage task distribution, pod creation, and lifecycle management across Kubernetes clusters.

Capabilities

Kubernetes Executor

Execute Airflow tasks as Kubernetes pods with comprehensive cluster management, resource allocation, and monitoring.

class KubernetesExecutor(BaseExecutor):
    """
    Executes tasks in Kubernetes pods.
    
    This executor launches each task as a separate Kubernetes pod, providing
    isolation, scalability, and resource management for Airflow task execution.
    
    Configuration is typically done through airflow.cfg under the
    [kubernetes_executor] section.
    
    Key Features:
    - Task isolation in separate pods
    - Dynamic resource allocation
    - Multi-namespace support
    - Pod template customization
    - Automatic cleanup and monitoring
    """
    def __init__(self, **kwargs): ...

    def start(self) -> None:
        """
        Start the executor.
        
        Initializes the Kubernetes client, starts the job watcher process,
        and prepares the executor for task scheduling.
        """
        ...

    def sync(self) -> None:
        """
        Synchronize task states.
        
        Updates task states based on pod status, processes completed tasks,
        and handles task state transitions.
        """
        ...

    def end(self) -> None:
        """
        End executor.
        
        Gracefully shuts down the executor, cleans up resources,
        and terminates background processes.
        """
        ...

    def terminate(self) -> None:
        """
        Terminate executor.
        
        Forcefully terminates the executor and all associated processes.
        """
        ...

    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: str | None = None,
        executor_config: dict | None = None
    ) -> None:
        """
        Execute task asynchronously.
        
        Args:
            key: Task instance key
            command: Command to execute
            queue: Execution queue (optional)
            executor_config: Executor-specific configuration
        """
        ...

    def adopt_launched_task(self, ti: TaskInstance, last_heartbeat: datetime, session: Session) -> bool:
        """
        Adopt a launched task.
        
        Args:
            ti: Task instance to adopt
            last_heartbeat: Last heartbeat timestamp
            session: Database session
            
        Returns:
            bool: True if task was successfully adopted
        """
        ...

Local Kubernetes Executor

Hybrid executor that runs local tasks with LocalExecutor and Kubernetes tasks with KubernetesExecutor based on queue configuration.

class LocalKubernetesExecutor(BaseExecutor):
    """
    Hybrid executor running LocalExecutor for local tasks and 
    KubernetesExecutor for Kubernetes tasks.
    
    Tasks are routed to the appropriate executor based on the queue name.
    By default, tasks in the 'kubernetes' queue are executed on Kubernetes,
    while all other tasks are executed locally.
    
    Configuration:
    - kubernetes_queue: Queue name for Kubernetes tasks (default: 'kubernetes')
    """
    def __init__(self, local_executor: BaseExecutor, kubernetes_executor: BaseExecutor, **kwargs): ...

    def start(self) -> None:
        """Start both local and Kubernetes executors."""
        ...

    def sync(self) -> None:
        """Synchronize both executors."""
        ...

    def end(self) -> None:
        """End both executors."""
        ...

    def terminate(self) -> None:
        """Terminate both executors."""
        ...

    def execute_async(
        self,
        key: TaskInstanceKey,
        command: CommandType,
        queue: str | None = None,
        executor_config: dict | None = None
    ) -> None:
        """
        Route task to appropriate executor based on queue.
        
        Args:
            key: Task instance key
            command: Command to execute  
            queue: Execution queue - determines executor selection
            executor_config: Executor-specific configuration
        """
        ...

    def has_task(self, ti: TaskInstance) -> bool:
        """Check if task exists in either executor."""
        ...

    def heartbeat(self) -> None:
        """Heartbeat both executors."""
        ...

Kubernetes Job Watcher

Process that monitors Kubernetes job status and manages job lifecycle for the executor.

class KubernetesJobWatcher(multiprocessing.Process, LoggingMixin):
    """
    Watches Kubernetes jobs for state changes.
    
    This process monitors pods created by the KubernetesExecutor,
    tracks their status changes, and communicates updates back
    to the main executor process.
    
    Args:
        namespace (str): Kubernetes namespace to watch
        multi_namespace_mode (bool): Enable multi-namespace watching
        watcher_queue (Queue): Queue for status updates
        resource_version (str, optional): Starting resource version
        worker_uuid (str): Unique identifier for this worker
        kube_config (Configuration): Kubernetes client configuration
    """
    def __init__(
        self,
        namespace: str,
        multi_namespace_mode: bool,
        watcher_queue: Queue,
        resource_version: str | None,
        worker_uuid: str,
        kube_config: Configuration,
        **kwargs
    ): ...

    def run(self) -> None:
        """Main process loop for watching Kubernetes jobs."""
        ...

    def _run(self) -> None:
        """Internal run method with error handling."""
        ...

Airflow Kubernetes Scheduler

Scheduler component that manages pod creation and task distribution on Kubernetes clusters.

class AirflowKubernetesScheduler(LoggingMixin):
    """
    Schedules pods on Kubernetes.
    
    Manages the creation, monitoring, and cleanup of Kubernetes pods
    for task execution. Handles pod templates, resource allocation,
    and namespace management.
    
    Args:
        kube_config (Configuration): Kubernetes client configuration
        task_queue (Queue): Queue of tasks to execute
        result_queue (Queue): Queue for execution results
        kube_client (ApiClient): Kubernetes API client
        worker_uuid (str): Unique identifier for this worker
    """
    def __init__(
        self,
        kube_config: Configuration,
        task_queue: Queue,
        result_queue: Queue,
        kube_client: ApiClient,
        worker_uuid: str,
        **kwargs
    ): ...

    def run_pod_async(self, pod: V1Pod, **kwargs) -> None:
        """Run pod asynchronously."""
        ...

    def delete_pod(self, pod_name: str, namespace: str) -> None:
        """Delete a pod."""
        ...

    def patch_pod_executor_done(self, pod_name: str, namespace: str) -> None:
        """Mark pod as executor done."""
        ...

    def _make_safe_pod_id(self, safe_dag_id: str, safe_task_id: str, safe_run_id: str) -> str:
        """Generate safe pod identifier."""
        ...

Resource Version Manager

Singleton class that manages Kubernetes resource versions for efficient watching.

class ResourceVersion(metaclass=Singleton):
    """
    Manages Kubernetes resource versions.
    
    Tracks resource versions for efficient watching of Kubernetes
    resources, enabling incremental updates and reducing API load.
    """
    def __init__(self): ...

    def get_resource_version(self) -> str:
        """Get current resource version."""
        ...

    def set_resource_version(self, version: str) -> None:
        """Set resource version."""
        ...

Utility Functions

Helper functions for executor operations and pod management.

def get_base_pod_from_template(
    pod_template_file: str | None,
    kube_client: ApiClient
) -> V1Pod:
    """
    Get base pod from template.
    
    Args:
        pod_template_file: Path to pod template file
        kube_client: Kubernetes API client
        
    Returns:
        V1Pod: Base pod specification
    """
    ...

Configuration

The Kubernetes Executor is configured through the airflow.cfg file under the [kubernetes_executor] section:

Core Configuration

[kubernetes_executor]
# Kubernetes namespace for pods
namespace = default

# Pod template file (optional)
pod_template_file = /path/to/pod_template.yaml

# Worker container image
worker_container_repository = apache/airflow
worker_container_tag = 2.7.0

# Delete pods after completion
delete_worker_pods = True
delete_worker_pods_on_failure = False

# In-cluster configuration
in_cluster = True

# Cluster context (for out-of-cluster)
cluster_context = my-cluster

# Config file path (for out-of-cluster)  
config_file = ~/.kube/config

Multi-Namespace Configuration

# Enable multi-namespace mode
multi_namespace_mode = True

# Specific namespaces (when not using cluster role)
multi_namespace_mode_namespace_list = namespace1,namespace2,namespace3

Performance Configuration

# Number of pods to create per scheduler loop
worker_pods_creation_batch_size = 3

# Task publish retry configuration
task_publish_max_retries = 3

# API client retry configuration  
api_client_retry_configuration = {"total": 3, "backoff_factor": 0.5}

Resource Management

# Pod termination grace period
termination_grace_period_seconds = 30

# Delete options for pod cleanup
delete_option_kwargs = {"grace_period_seconds": 10}

Usage Examples

Basic Executor Configuration

# airflow.cfg configuration for KubernetesExecutor
[core]
executor = KubernetesExecutor

[kubernetes_executor]
namespace = airflow
worker_container_repository = my-registry/airflow
worker_container_tag = 2.7.0-custom
delete_worker_pods = True
in_cluster = True

Pod Template Configuration

# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker
  namespace: airflow
spec:
  serviceAccountName: airflow-worker
  securityContext:
    runAsUser: 50000
    runAsGroup: 50000
    fsGroup: 50000
  containers:
  - name: base
    image: apache/airflow:2.7.0
    resources:
      requests:
        memory: "512Mi"
        cpu: "500m"
      limits:
        memory: "1Gi"
        cpu: "1000m"
    env:
    - name: AIRFLOW__CORE__EXECUTOR
      value: LocalExecutor
    - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
      valueFrom:
        secretKeyRef:
          name: airflow-secrets
          key: connection-string
    volumeMounts:
    - name: airflow-dags
      mountPath: /opt/airflow/dags
    - name: airflow-logs
      mountPath: /opt/airflow/logs
  volumes:
  - name: airflow-dags
    persistentVolumeClaim:
      claimName: airflow-dags-pvc
  - name: airflow-logs
    persistentVolumeClaim:
      claimName: airflow-logs-pvc
  restartPolicy: Never

Local Kubernetes Executor Setup

# airflow.cfg for LocalKubernetesExecutor
[core]
executor = LocalKubernetesExecutor

[local_kubernetes_executor]
kubernetes_queue = kubernetes

[kubernetes_executor]
namespace = airflow-k8s
worker_container_repository = my-registry/airflow
delete_worker_pods = True

[celery]
# Local executor falls back to SequentialExecutor for local tasks
# Configure if you want CeleryExecutor for local tasks instead

Task with Kubernetes Executor Config

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def my_task():
    print("Running on Kubernetes!")
    return "Task completed"

dag = DAG(
    'kubernetes_executor_example',
    start_date=datetime(2023, 1, 1),
    schedule_interval=None,
    catchup=False
)

# Task with custom executor config
k8s_task = PythonOperator(
    task_id='kubernetes_task',
    python_callable=my_task,
    executor_config={
        'KubernetesExecutor': {
            'namespace': 'custom-namespace',
            'image': 'custom-image:latest',
            'request_memory': '1Gi',
            'request_cpu': '500m',
            'limit_memory': '2Gi',
            'limit_cpu': '1000m',
            'labels': {'team': 'data-engineering'},
            'annotations': {'monitoring': 'enabled'},
            'env_vars': [
                {'name': 'CUSTOM_VAR', 'value': 'custom_value'}
            ],
            'secrets': [
                {
                    'deploy_type': 'env',
                    'deploy_target': 'DB_PASSWORD',
                    'secret': 'database-secret',
                    'key': 'password'
                }
            ],
            'volumes': [
                {
                    'name': 'data-volume',
                    'persistentVolumeClaim': {
                        'claimName': 'data-pvc'
                    }
                }
            ],
            'volume_mounts': [
                {
                    'name': 'data-volume',
                    'mountPath': '/data'
                }
            ],
            'node_selector': {'disktype': 'ssd'},
            'affinity': {
                'nodeAffinity': {
                    'requiredDuringSchedulingIgnoredDuringExecution': {
                        'nodeSelectorTerms': [{
                            'matchExpressions': [{
                                'key': 'kubernetes.io/arch',
                                'operator': 'In',
                                'values': ['amd64']
                            }]
                        }]
                    }
                }
            },
            'tolerations': [
                {
                    'key': 'compute-type',
                    'operator': 'Equal',
                    'value': 'gpu',
                    'effect': 'NoSchedule'
                }
            ]
        }
    },
    dag=dag
)

Queue-Based Executor Routing

# Tasks for LocalKubernetesExecutor with queue routing

# This task runs locally (default queue)
local_task = PythonOperator(
    task_id='local_task',
    python_callable=lambda: print("Running locally!"),
    dag=dag
)

# This task runs on Kubernetes (kubernetes queue)
k8s_task = PythonOperator(
    task_id='k8s_task',
    python_callable=lambda: print("Running on Kubernetes!"),
    queue='kubernetes',  # Routes to KubernetesExecutor
    dag=dag
)

# Custom queue name (configure in airflow.cfg)
gpu_task = PythonOperator(
    task_id='gpu_task',
    python_callable=lambda: print("Running on GPU nodes!"),
    queue='gpu-queue',  # Custom queue for GPU nodes
    executor_config={
        'KubernetesExecutor': {
            'namespace': 'gpu-namespace',
            'node_selector': {'accelerator': 'nvidia-gpu'},
            'tolerations': [
                {
                    'key': 'nvidia.com/gpu',
                    'operator': 'Exists',
                    'effect': 'NoSchedule'
                }
            ],
            'resources': {
                'limits': {
                    'nvidia.com/gpu': '1'
                }
            }
        }
    },
    dag=dag
)

High Availability Configuration

# airflow.cfg for HA Kubernetes Executor setup
[kubernetes_executor]
namespace = airflow-prod

# Multi-namespace for isolation
multi_namespace_mode = True
multi_namespace_mode_namespace_list = airflow-prod,airflow-staging

# High throughput configuration
worker_pods_creation_batch_size = 10
task_publish_max_retries = 5

# Pod cleanup configuration
delete_worker_pods = True
delete_worker_pods_on_failure = False

# Fatal container states that should fail tasks immediately
worker_pod_pending_fatal_container_state_reasons = CreateContainerConfigError,ErrImagePull,CreateContainerError,ImageInspectError,InvalidImageName

# Networking configuration
enable_tcp_keepalive = True
tcp_keep_idle = 120
tcp_keep_intvl = 30
tcp_keep_cnt = 6

# SSL configuration for production
verify_ssl = True
ssl_ca_cert = /etc/ssl/certs/ca-certificates.crt

# Resource management
delete_option_kwargs = {"grace_period_seconds": 30, "propagation_policy": "Foreground"}

Monitoring and Observability

# Task with enhanced monitoring configuration
monitored_task = PythonOperator(
    task_id='monitored_task',
    python_callable=my_task,
    executor_config={
        'KubernetesExecutor': {
            'labels': {
                'app': 'airflow',
                'component': 'worker',
                'version': '2.7.0',
                'team': 'data-platform'
            },
            'annotations': {
                'prometheus.io/scrape': 'true',
                'prometheus.io/port': '8080',
                'logging.coreos.com/enabled': 'true'
            }
        }
    },
    dag=dag
)

Cleanup and Maintenance

def cleanup_completed_pods(**context):
    """Clean up completed pods older than specified age."""
    from airflow.providers.cncf.kubernetes.hooks.kubernetes import KubernetesHook
    from datetime import datetime, timedelta
    
    hook = KubernetesHook(conn_id='kubernetes_default')
    client = hook.get_conn()
    
    # Get all pods in airflow namespace
    pods = client.list_namespaced_pod(
        namespace='airflow',
        label_selector='airflow-worker=true'
    )
    
    cutoff_time = datetime.utcnow() - timedelta(hours=24)
    
    for pod in pods.items:
        # Check if pod is completed and old
        if (pod.status.phase in ['Succeeded', 'Failed'] and 
            pod.metadata.creation_timestamp < cutoff_time):
            
            client.delete_namespaced_pod(
                name=pod.metadata.name,
                namespace='airflow',
                grace_period_seconds=0
            )
            print(f"Deleted pod: {pod.metadata.name}")

cleanup_task = PythonOperator(
    task_id='cleanup_old_pods',
    python_callable=cleanup_completed_pods,
    schedule_interval='@daily',
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-cncf-kubernetes

docs

api-integration.md

decorators.md

executors.md

index.md

job-management.md

monitoring.md

pod-operations.md

resource-management.md

spark-integration.md

tile.json